the description for this gist
monixObservable | |
.mapAsync(1) { | |
case CommitableMsg(MsgA(a), offset) => | |
someBusinessTask(a).map(_ => offset) | |
case CommitableMsg(_, offset) => | |
Task.now(offset) | |
} | |
.bufferTimedAndCounted(1.second, 100) | |
.filter(_.nonEmpty) | |
.mapAsync(1)(offsets => Task.fromFuture(offsets.last.commitOffset)) | |
.takeWhileNotCanceled(injectedCancellable) | |
.consumeWith(Consumer.complete) | |
.runAsync(monix.execution.Scheduler.computation()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment