Skip to content

Instantly share code, notes, and snippets.

@dvtomas
Last active February 1, 2016 08:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvtomas/e47bce4f8849da9721dc to your computer and use it in GitHub Desktop.
Save dvtomas/e47bce4f8849da9721dc to your computer and use it in GitHub Desktop.
package info.td.common.monix.impl
import monifu.concurrent.atomic.padded.Atomic
import monifu.reactive.Ack.{Cancel, Continue}
import monifu.reactive.exceptions.BufferOverflowException
import monifu.reactive.observers.{BufferedSubscriber, SynchronousSubscriber}
import monifu.reactive.{Ack, Subscriber}
import scala.annotation.tailrec
import scala.util.Failure
import scala.util.control.NonFatal
/**
* This operator performs buffering of the input elements depending on the ability of the (usually slow) consumer to process items.
*
* When the downstream subscriber is not busy processing anything and an input element is received from upstream, the downstream consumer is immediately handed a collection with this element.
*
* When the downstream subscriber is busy processing a collection of input elements, this operator buffers the incoming items.
* As soon as the subscriber finishes processing the last batch of input elements, it is fed the new collection that has accumulated meanwhile.
* If more than one elements have arrived, the most recent one is the last one in the collection.
*
* If bufferSize is 0, the buffer is unbounded.
* If bufferSize > 0 and more elements than bufferSize arrive during an iteration of downstream subscriber processing, this operator signals an error.
*
* As for implementation, it is basically a stolen and slightly modified SimpleBufferedSubscriber from Monix 1.0. */
private[monix] final class BufferIntrospectiveSubscriber[-T](underlying: Subscriber[Seq[T]], bufferSize: Int = 0)
extends BufferedSubscriber[T] with SynchronousSubscriber[T] {
self ⇒
require(bufferSize >= 0, "bufferSize must be a positive number")
implicit val scheduler = underlying.scheduler
private[this] val buffer = Atomic(Vector.empty[T])
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
// to be modified only in onError, before upstreamIsComplete
@volatile private[this] var errorThrown: Throwable = null
// to be modified only in onError / onComplete
@volatile private[this] var upstreamIsComplete = false
// to be modified only by consumer
@volatile private[this] var downstreamIsDone = false
// for enforcing non-concurrent updates. It is the number of items that are to be pushed to the subscriber
private[this] val itemsToPush = Atomic(0)
def onNext(elem: T): Ack = {
if (!upstreamIsComplete && !downstreamIsDone) {
try {
buffer.transform(_ :+ elem)
pushToConsumer()
Continue
}
catch {
case NonFatal(ex) ⇒
onError(ex)
Cancel
}
}
else
Cancel
}
def onError(ex: Throwable) = {
if (!upstreamIsComplete && !downstreamIsDone) {
errorThrown = ex
upstreamIsComplete = true
pushToConsumer()
}
}
def onComplete() = {
if (!upstreamIsComplete && !downstreamIsDone) {
upstreamIsComplete = true
pushToConsumer()
}
}
@tailrec
private[this] def pushToConsumer(): Unit = {
// For race condition check with fastLoop. Number of items that remain to be pushed to the subscriber
// If zero, fastLoop is not running and we need to start it up.
val currentNr = itemsToPush.get
if (bufferSize == 0) {
// unbounded branch
if (!itemsToPush.compareAndSet(currentNr, currentNr + 1)) {
pushToConsumer()
} else if (currentNr == 0) {
scheduler.execute(() ⇒ fastLoop(0, 0))
}
}
else {
// triggering overflow branch
if (currentNr >= bufferSize && !upstreamIsComplete) {
self.onError(new BufferOverflowException(
s"Downstream observer is too slow, buffer over capacity with a " +
s"specified buffer size of $bufferSize"))
}
else if (!itemsToPush.compareAndSet(currentNr, currentNr + 1)) {
pushToConsumer()
} else if (currentNr == 0) {
scheduler.execute(() ⇒ fastLoop(0, 0))
}
}
}
private[this] def rescheduled(processed: Int): Unit = {
fastLoop(processed, 0)
}
@tailrec
private[this] def fastLoop(processed: Int, syncIndex: Int): Unit = {
if (!downstreamIsDone) {
val hasError = errorThrown ne null
val bufferedElems = buffer.getAndSet(Vector.empty)
if (bufferedElems.nonEmpty) {
val ack = underlying.onNext(bufferedElems)
val nextSyncIndex =
if (!ack.isCompleted) {
0
} else {
(syncIndex + 1) & batchSizeModulus
}
if (nextSyncIndex > 0) {
if (ack == Continue || ack.value.get == Continue.IsSuccess) {
// process next
fastLoop(processed + bufferedElems.size, nextSyncIndex)
} else if (ack == Cancel || ack.value.get == Cancel.IsSuccess) {
// ending loop
downstreamIsDone = true
itemsToPush.set(0)
} else if (ack.value.get.isFailure) {
// ending loop
downstreamIsDone = true
itemsToPush.set(0)
underlying.onError(ack.value.get.failed.get)
} else {
// never happens
downstreamIsDone = true
itemsToPush.set(0)
underlying.onError(new MatchError(ack.value.get.toString))
}
} else ack.onComplete {
case Continue.IsSuccess ⇒
// re-run loop (in different thread)
rescheduled(processed + bufferedElems.size)
case Cancel.IsSuccess ⇒
// ending loop
downstreamIsDone = true
itemsToPush.set(0)
case Failure(ex) ⇒
// ending loop
downstreamIsDone = true
itemsToPush.set(0)
underlying.onError(ex)
case other ⇒
// never happens, but to appease the Scala compiler
downstreamIsDone = true
itemsToPush.set(0)
underlying.onError(new MatchError(s"$other"))
}
} else if (upstreamIsComplete || hasError) {
// Race-condition check, but if upstreamIsComplete=true is visible,
// then the queue should be fully published because there's a clear happens-before
// relationship between queue.offer() and upstreamIsComplete=true
if (buffer.get.nonEmpty) {
fastLoop(processed, syncIndex)
} else {
// ending loop
downstreamIsDone = true
itemsToPush.set(0)
buffer.set(Vector.empty) // for GC purposes
if (errorThrown ne null) {
underlying.onError(errorThrown)
} else {
underlying.onComplete()
}
}
} else {
val remaining = itemsToPush.decrementAndGet(processed)
// if the queue is non-empty (i.e. concurrent modifications just happened)
// then start all over again
if (remaining > 0) {
fastLoop(0, syncIndex)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment