Last active
February 1, 2016 08:03
-
-
Save dvtomas/e47bce4f8849da9721dc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package info.td.common.monix.impl | |
import monifu.concurrent.atomic.padded.Atomic | |
import monifu.reactive.Ack.{Cancel, Continue} | |
import monifu.reactive.exceptions.BufferOverflowException | |
import monifu.reactive.observers.{BufferedSubscriber, SynchronousSubscriber} | |
import monifu.reactive.{Ack, Subscriber} | |
import scala.annotation.tailrec | |
import scala.util.Failure | |
import scala.util.control.NonFatal | |
/** | |
* This operator performs buffering of the input elements depending on the ability of the (usually slow) consumer to process items. | |
* | |
* When the downstream subscriber is not busy processing anything and an input element is received from upstream, the downstream consumer is immediately handed a collection with this element. | |
* | |
* When the downstream subscriber is busy processing a collection of input elements, this operator buffers the incoming items. | |
* As soon as the subscriber finishes processing the last batch of input elements, it is fed the new collection that has accumulated meanwhile. | |
* If more than one elements have arrived, the most recent one is the last one in the collection. | |
* | |
* If bufferSize is 0, the buffer is unbounded. | |
* If bufferSize > 0 and more elements than bufferSize arrive during an iteration of downstream subscriber processing, this operator signals an error. | |
* | |
* As for implementation, it is basically a stolen and slightly modified SimpleBufferedSubscriber from Monix 1.0. */ | |
private[monix] final class BufferIntrospectiveSubscriber[-T](underlying: Subscriber[Seq[T]], bufferSize: Int = 0) | |
extends BufferedSubscriber[T] with SynchronousSubscriber[T] { | |
self ⇒ | |
require(bufferSize >= 0, "bufferSize must be a positive number") | |
implicit val scheduler = underlying.scheduler | |
private[this] val buffer = Atomic(Vector.empty[T]) | |
private[this] val batchSizeModulus = scheduler.env.batchSize - 1 | |
// to be modified only in onError, before upstreamIsComplete | |
@volatile private[this] var errorThrown: Throwable = null | |
// to be modified only in onError / onComplete | |
@volatile private[this] var upstreamIsComplete = false | |
// to be modified only by consumer | |
@volatile private[this] var downstreamIsDone = false | |
// for enforcing non-concurrent updates. It is the number of items that are to be pushed to the subscriber | |
private[this] val itemsToPush = Atomic(0) | |
def onNext(elem: T): Ack = { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
try { | |
buffer.transform(_ :+ elem) | |
pushToConsumer() | |
Continue | |
} | |
catch { | |
case NonFatal(ex) ⇒ | |
onError(ex) | |
Cancel | |
} | |
} | |
else | |
Cancel | |
} | |
def onError(ex: Throwable) = { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
errorThrown = ex | |
upstreamIsComplete = true | |
pushToConsumer() | |
} | |
} | |
def onComplete() = { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
upstreamIsComplete = true | |
pushToConsumer() | |
} | |
} | |
@tailrec | |
private[this] def pushToConsumer(): Unit = { | |
// For race condition check with fastLoop. Number of items that remain to be pushed to the subscriber | |
// If zero, fastLoop is not running and we need to start it up. | |
val currentNr = itemsToPush.get | |
if (bufferSize == 0) { | |
// unbounded branch | |
if (!itemsToPush.compareAndSet(currentNr, currentNr + 1)) { | |
pushToConsumer() | |
} else if (currentNr == 0) { | |
scheduler.execute(() ⇒ fastLoop(0, 0)) | |
} | |
} | |
else { | |
// triggering overflow branch | |
if (currentNr >= bufferSize && !upstreamIsComplete) { | |
self.onError(new BufferOverflowException( | |
s"Downstream observer is too slow, buffer over capacity with a " + | |
s"specified buffer size of $bufferSize")) | |
} | |
else if (!itemsToPush.compareAndSet(currentNr, currentNr + 1)) { | |
pushToConsumer() | |
} else if (currentNr == 0) { | |
scheduler.execute(() ⇒ fastLoop(0, 0)) | |
} | |
} | |
} | |
private[this] def rescheduled(processed: Int): Unit = { | |
fastLoop(processed, 0) | |
} | |
@tailrec | |
private[this] def fastLoop(processed: Int, syncIndex: Int): Unit = { | |
if (!downstreamIsDone) { | |
val hasError = errorThrown ne null | |
val bufferedElems = buffer.getAndSet(Vector.empty) | |
if (bufferedElems.nonEmpty) { | |
val ack = underlying.onNext(bufferedElems) | |
val nextSyncIndex = | |
if (!ack.isCompleted) { | |
0 | |
} else { | |
(syncIndex + 1) & batchSizeModulus | |
} | |
if (nextSyncIndex > 0) { | |
if (ack == Continue || ack.value.get == Continue.IsSuccess) { | |
// process next | |
fastLoop(processed + bufferedElems.size, nextSyncIndex) | |
} else if (ack == Cancel || ack.value.get == Cancel.IsSuccess) { | |
// ending loop | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
} else if (ack.value.get.isFailure) { | |
// ending loop | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
underlying.onError(ack.value.get.failed.get) | |
} else { | |
// never happens | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
underlying.onError(new MatchError(ack.value.get.toString)) | |
} | |
} else ack.onComplete { | |
case Continue.IsSuccess ⇒ | |
// re-run loop (in different thread) | |
rescheduled(processed + bufferedElems.size) | |
case Cancel.IsSuccess ⇒ | |
// ending loop | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
case Failure(ex) ⇒ | |
// ending loop | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
underlying.onError(ex) | |
case other ⇒ | |
// never happens, but to appease the Scala compiler | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
underlying.onError(new MatchError(s"$other")) | |
} | |
} else if (upstreamIsComplete || hasError) { | |
// Race-condition check, but if upstreamIsComplete=true is visible, | |
// then the queue should be fully published because there's a clear happens-before | |
// relationship between queue.offer() and upstreamIsComplete=true | |
if (buffer.get.nonEmpty) { | |
fastLoop(processed, syncIndex) | |
} else { | |
// ending loop | |
downstreamIsDone = true | |
itemsToPush.set(0) | |
buffer.set(Vector.empty) // for GC purposes | |
if (errorThrown ne null) { | |
underlying.onError(errorThrown) | |
} else { | |
underlying.onComplete() | |
} | |
} | |
} else { | |
val remaining = itemsToPush.decrementAndGet(processed) | |
// if the queue is non-empty (i.e. concurrent modifications just happened) | |
// then start all over again | |
if (remaining > 0) { | |
fastLoop(0, syncIndex) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment