Last active
February 2, 2016 15:00
-
-
Save dvtomas/e9288823b95f3f57d753 to your computer and use it in GitHub Desktop.
crazy Monix operator producerConsumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package info.td.common.monix.impl | |
import java.util.concurrent.Semaphore | |
import info.td.common.monix.ProducerConsumerStrategy | |
import monifu.reactive.Ack.{Cancel, Continue} | |
import monifu.reactive.observers.SynchronousSubscriber | |
import monifu.reactive.{Ack, Subscriber} | |
import scala.annotation.tailrec | |
import scala.util.control.NonFatal | |
import scala.util.{Failure, Success, Try} | |
/** | |
* A generalized solution to the Consumer-Producer problem. | |
* | |
* The upstream source is regarded to as a producer generating items. Those | |
* are immediately buffered into a shared structure. In parallel, items are | |
* being retrieved (consumed) from this shared structure and handed over to a | |
* (potentially slow) downstream subscriber (consumer). | |
* | |
* In the classical producer-consumer the producer generates items, those are | |
* buffered, and the very same items in the very same order are being consumed | |
* by the consumer. | |
* | |
* This implementation uses a more general notion of what a "buffer" is. It | |
* can be just any shared structure able to produce and consume items. The | |
* type and the number of the input and output items doesn't even have to be | |
* the same. How the producing/consuming on this structure behaves is | |
* specified via an instance of `ProducerConsumerStrategy`, that is handed to | |
* the operator at creation time. This allows for easy implementing of the | |
* classical producer-consumer problem, but also various MRU caches, | |
* aggregation strategies, bufferIntrospective and observeLatestOn operators | |
* etc.. | |
* | |
* NOTE: bufferIntrospective has a separate, more efficient implementation. In | |
* case you need to only aggregate things that have accumulated during the | |
* latest `onNext` of the downstream subscriber, it is better to use that one. | |
* * | |
* @tparam T the type of the producer (upstream) elements | |
* @tparam R the type of the consumer (downstream) elements | |
*/ | |
private[monix] final class ProducerConsumerSubscriber[-T, +R](underlying: Subscriber[R], strategy: ProducerConsumerStrategy[T, R]) | |
extends SynchronousSubscriber[T] { | |
self ⇒ | |
implicit val scheduler = underlying.scheduler | |
private[this] val batchSizeModulus = scheduler.env.batchSize - 1 | |
// to be modified only in onError, before upstreamIsComplete | |
@volatile private[this] var errorThrown: Throwable = null | |
// to be modified only in onError / onComplete | |
@volatile private[this] var upstreamIsComplete = false | |
// to be modified only by consumer | |
@volatile private[this] var downstreamIsDone = false | |
private[this] val lock = new Semaphore(1) | |
@volatile private[this] var state = strategy.empty | |
@volatile private[this] var isPushLoopRunning = false | |
def onNext(elem: T): Ack = { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
lock.acquire() | |
val (maybeError, updatedState) = | |
try { | |
(None, strategy.add(state, elem)) | |
} catch { | |
case NonFatal(error) ⇒ (Some(error), strategy.empty) | |
} | |
state = updatedState | |
maybeError match { | |
case None ⇒ { | |
pushToConsumer() | |
Continue | |
} | |
case Some(error) ⇒ { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
errorThrown = error | |
upstreamIsComplete = true | |
pushToConsumer() | |
} | |
Cancel | |
} | |
} | |
} else { | |
Cancel | |
} | |
} | |
def onError(ex: Throwable) = { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
errorThrown = ex | |
upstreamIsComplete = true | |
lock.acquire() | |
pushToConsumer() | |
} | |
} | |
def onComplete() = { | |
if (!upstreamIsComplete && !downstreamIsDone) { | |
upstreamIsComplete = true | |
lock.acquire() | |
pushToConsumer() | |
} | |
} | |
private[this] def pushToConsumer(): Unit = { | |
val shouldRunPushLoop = !isPushLoopRunning | |
if (shouldRunPushLoop) { | |
isPushLoopRunning = true | |
} | |
lock.release() | |
if (shouldRunPushLoop) { | |
scheduler.execute(() ⇒ pushLoop(0)) | |
} | |
} | |
private[this] def rescheduled(): Unit = { | |
pushLoop(0) | |
} | |
@tailrec | |
private[this] def pushLoop(syncIndex: Int): Unit = { | |
if (!downstreamIsDone) { | |
lock.acquire() | |
val maybeItemToEmit = Try(strategy.get(state)) match { | |
case Success((Some(item), newState)) ⇒ { | |
state = newState | |
Some(item) | |
} | |
case Success((None, newState)) ⇒ { | |
state = newState | |
None | |
} | |
case Failure(error) ⇒ { | |
errorThrown = error | |
None | |
} | |
} | |
lock.release() | |
if (maybeItemToEmit.isEmpty) { | |
isPushLoopRunning = false | |
} | |
val hasError = errorThrown ne null | |
maybeItemToEmit match { | |
case Some(itemToEmit) ⇒ { | |
val ack = underlying onNext itemToEmit | |
val nextSyncIndex = | |
if (!ack.isCompleted) { | |
0 | |
} else { | |
(syncIndex + 1) & batchSizeModulus | |
} | |
if (nextSyncIndex > 0) { | |
if (ack == Continue || ack.value.get == Continue.IsSuccess) { | |
// process next | |
pushLoop(nextSyncIndex) | |
} else if (ack == Cancel || ack.value.get == Cancel.IsSuccess) { | |
// ending loop | |
downstreamIsDone = true | |
isPushLoopRunning = false | |
} else if (ack.value.get.isFailure) { | |
// ending loop | |
downstreamIsDone = true | |
isPushLoopRunning = false | |
underlying.onError(ack.value.get.failed.get) | |
} else { | |
// never happens | |
downstreamIsDone = true | |
isPushLoopRunning = false | |
underlying.onError(new MatchError(ack.value.get.toString)) | |
} | |
} else ack.onComplete { | |
case Continue.IsSuccess ⇒ | |
// re-run loop (in different thread) | |
rescheduled() | |
case Cancel.IsSuccess ⇒ | |
// ending loop | |
downstreamIsDone = true | |
isPushLoopRunning = false | |
case Failure(ex) ⇒ | |
// ending loop | |
downstreamIsDone = true | |
isPushLoopRunning = false | |
underlying.onError(ex) | |
case other ⇒ | |
// never happens, but to appease the Scala compiler | |
downstreamIsDone = true | |
isPushLoopRunning = false | |
underlying.onError(new MatchError(s"$other")) | |
} | |
} | |
case None ⇒ { | |
if (upstreamIsComplete || hasError) { | |
// Race-condition check, remnants of the same implementation in | |
// SimpleBufferedSubscriber (Monix 1.0), hopefully it is OK, | |
// perhaps redundant | |
downstreamIsDone = true | |
state = strategy.empty // for GC purposes | |
if (errorThrown ne null) { | |
underlying.onError(errorThrown) | |
} else { | |
underlying.onComplete() | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
/** This is the shared structure that describes the producing and consuming | |
* behavior of a particular instance of the `producerConsumer` operator. | |
* | |
* @tparam T The type of the input (observed, upstream, producer) elements | |
* @tparam R The type of the output (generated, downstream, consumer) elements | |
*/ | |
abstract class ProducerConsumerStrategy[-T, +R] { | |
/** | |
* The type of the state internal to this particular | |
* ProducerConsumerStrategy | |
*/ | |
type StateType | |
/** | |
* This method should generate a new state from an old state and an element | |
* that has just been produced. If this method throws an exception, | |
* `producerConsumer` will signal this to the downstream subscriber via | |
* `onError` | |
* | |
* @param state The old state | |
* @param element A newly produced element that is to be added to this shared structure | |
* @return New state, probably somehow reflecting that a new element has been added. | |
*/ | |
def add(state: StateType, element: T): StateType | |
/** | |
* This method is used by the consumer to retrieve next element from the | |
* shared structure. It should return either an element and a new state | |
* (reflecting that the item has just been removed), or None if the old state | |
* already represent an empty structure | |
* | |
* If this method throws an exception, `producerConsumer` will signal this | |
* to the downstream subscriber via `onError` | |
* | |
* @param state The old state | |
* @return Some(item) retrieved from the old state if any, along with the new state | |
*/ | |
def get(state: StateType): (Option[R], StateType) | |
def empty: StateType | |
} | |
/* ------------------------- the unit tests --------------------------- */ | |
package info.td.common.monix | |
import java.util.Date | |
import info.td.common.monix.MonixHelpers._ | |
import info.td.common.monix.testing.MonixTestHelpers | |
import monifu.reactive.{Ack, Observable} | |
import utest._ | |
import utest.framework.TestSuite | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
object ProducerConsumerTest extends TestSuite { | |
val tests = TestSuite { | |
object GetByOneProducerConsumerStrategy extends ProducerConsumerStrategy[Int, Int] { | |
type StateType = List[Int] | |
override def add(elements: StateType, element: Int) = elements :+ element | |
override def get(elements: StateType) = { | |
elements match { | |
case element :: tail ⇒ (Some(element), tail) | |
case Nil ⇒ (None, Nil) | |
} | |
} | |
def empty = Nil | |
} | |
object GetAllProducerConsumerStrategy extends ProducerConsumerStrategy[Int, List[Int]] { | |
type StateType = List[Int] | |
override def add(elements: StateType, element: Int) = elements :+ element | |
override def get(elements: StateType) = { | |
if (elements.isEmpty) { | |
(None, Nil) | |
} else { | |
(Some(elements), Nil) | |
} | |
} | |
override def empty = Nil | |
} | |
val error: RuntimeException = new RuntimeException("Test error") | |
object ErrorWhenAddingProducerConsumerStrategy extends ProducerConsumerStrategy[Int, Any] { | |
type StateType = Any | |
override def add(elements: StateType, element: Int) = { | |
throw error | |
} | |
override def get(state: StateType) = (None, ()) | |
override def empty: Unit = () | |
} | |
object ErrorWhenGettingProducerConsumerStrategy extends ProducerConsumerStrategy[Int, Any] { | |
type StateType = Any | |
override def add(elements: StateType, element: Int) = () | |
override def get(state: StateType) = { | |
throw error | |
} | |
override def empty: Unit = () | |
} | |
"getByOne" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒ | |
val timeSlice = 20.milli | |
val result: Vector[Int] = MonixTestHelpers | |
.delaysBetweenEmissions(Seq(100.milli, 100.milli, 200.milli)) | |
.map(_.toInt) | |
.producerConsumer(GetByOneProducerConsumerStrategy) | |
.doWork(_ ⇒ schedulerProvider.advanceTimeBy(timeSlice * 3 + timeSlice / 5)) | |
.toVectorWithAction(timeSlice * 15) { | |
schedulerProvider.advanceTimeBy(timeSlice * 15) | |
}(schedulerProvider.defaultScheduler) | |
val expectedResult = Vector(0, 1, 2) | |
assert(result == expectedResult) | |
} | |
"getAll" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒ | |
val timeSlice = 20.milli | |
val result: Vector[List[Int]] = | |
Observable | |
.intervalAtFixedRate(timeSlice) | |
.map(_.toInt) | |
.take(10) | |
.producerConsumer(GetAllProducerConsumerStrategy) | |
.doWork(_ ⇒ schedulerProvider.advanceTimeBy(timeSlice * 3 + timeSlice / 5)) | |
.toVectorWithAction(timeSlice * 15) { | |
schedulerProvider.advanceTimeBy(timeSlice * 15) | |
}(schedulerProvider.defaultScheduler) | |
assert(result.size == 4) | |
assert(result(0) == List(0)) | |
assert(result(1) == List(1, 2, 3)) | |
assert(result(2) == List(4, 5, 6)) | |
assert(result(3) == List(7, 8, 9)) | |
} | |
"getAll many elements slow subscriber" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒ | |
val timeSlice = 2 | |
val numberOfElements = 1000 | |
val r = new Random(new Date().getTime) | |
val xsList = | |
MonixTestHelpers | |
.delaysBetweenEmissions(Seq.tabulate(numberOfElements)(_ ⇒ r.nextInt(timeSlice).milli)) | |
.map(_.toInt) | |
.producerConsumer(GetAllProducerConsumerStrategy) | |
.doWork((_, schedulerProvider.advanceTimeBy(r.nextInt(timeSlice * 3).milli))) | |
.toVectorWithAction((numberOfElements * timeSlice).milli) { | |
schedulerProvider.advanceTimeBy(100.milli) | |
}(schedulerProvider.defaultScheduler) | |
val flattenedResult = xsList.flatten | |
val expectedResult = (0 until numberOfElements).toVector | |
assert(flattenedResult == expectedResult) | |
} | |
"getAll many elements fast subscriber" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒ | |
val timeSlice = 2 | |
val numberOfElements = 1000 | |
val r = new Random(new Date().getTime) | |
val xsList = | |
MonixTestHelpers | |
.delaysBetweenEmissions(Seq.tabulate(numberOfElements)(_ ⇒ r.nextInt(timeSlice).milli)) | |
.map(_.toInt) | |
.producerConsumer(GetAllProducerConsumerStrategy) | |
.toVectorWithAction((numberOfElements * timeSlice).milli) { | |
schedulerProvider.advanceTimeBy((numberOfElements * timeSlice).milli) | |
}(schedulerProvider.defaultScheduler) | |
val flattenedResult = xsList.flatten | |
val expectedResult = (0 until numberOfElements).toVector | |
assert(flattenedResult == expectedResult) | |
} | |
def assertErrorEmitted[T](o: Observable[T], time: FiniteDuration)(schedulerProvider: SchedulerProvider): Unit = { | |
var errorThrown: Throwable = null | |
o.subscribe(_ ⇒ Ack.Continue, error ⇒ errorThrown = error)(schedulerProvider.defaultScheduler) | |
schedulerProvider.advanceTimeBy(time) | |
assert(errorThrown == error) | |
} | |
"error producing ProducerConsumer" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒ | |
assertErrorEmitted(Observable | |
.from(1, 2, 3) | |
.producerConsumer(ErrorWhenAddingProducerConsumerStrategy), | |
100.milli | |
)(schedulerProvider) | |
} | |
"get throwing an error is handled correctly " - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒ | |
assertErrorEmitted(Observable | |
.from(1, 2, 3) | |
.producerConsumer(ErrorWhenGettingProducerConsumerStrategy), | |
100.milli | |
)(schedulerProvider) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment