Skip to content

Instantly share code, notes, and snippets.

@dvtomas
Last active February 2, 2016 15:00
Show Gist options
  • Save dvtomas/e9288823b95f3f57d753 to your computer and use it in GitHub Desktop.
Save dvtomas/e9288823b95f3f57d753 to your computer and use it in GitHub Desktop.
crazy Monix operator producerConsumer
package info.td.common.monix.impl
import java.util.concurrent.Semaphore
import info.td.common.monix.ProducerConsumerStrategy
import monifu.reactive.Ack.{Cancel, Continue}
import monifu.reactive.observers.SynchronousSubscriber
import monifu.reactive.{Ack, Subscriber}
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
/**
* A generalized solution to the Consumer-Producer problem.
*
* The upstream source is regarded to as a producer generating items. Those
* are immediately buffered into a shared structure. In parallel, items are
* being retrieved (consumed) from this shared structure and handed over to a
* (potentially slow) downstream subscriber (consumer).
*
* In the classical producer-consumer the producer generates items, those are
* buffered, and the very same items in the very same order are being consumed
* by the consumer.
*
* This implementation uses a more general notion of what a "buffer" is. It
* can be just any shared structure able to produce and consume items. The
* type and the number of the input and output items doesn't even have to be
* the same. How the producing/consuming on this structure behaves is
* specified via an instance of `ProducerConsumerStrategy`, that is handed to
* the operator at creation time. This allows for easy implementing of the
* classical producer-consumer problem, but also various MRU caches,
* aggregation strategies, bufferIntrospective and observeLatestOn operators
* etc..
*
* NOTE: bufferIntrospective has a separate, more efficient implementation. In
* case you need to only aggregate things that have accumulated during the
* latest `onNext` of the downstream subscriber, it is better to use that one.
* *
* @tparam T the type of the producer (upstream) elements
* @tparam R the type of the consumer (downstream) elements
*/
private[monix] final class ProducerConsumerSubscriber[-T, +R](underlying: Subscriber[R], strategy: ProducerConsumerStrategy[T, R])
extends SynchronousSubscriber[T] {
self ⇒
implicit val scheduler = underlying.scheduler
private[this] val batchSizeModulus = scheduler.env.batchSize - 1
// to be modified only in onError, before upstreamIsComplete
@volatile private[this] var errorThrown: Throwable = null
// to be modified only in onError / onComplete
@volatile private[this] var upstreamIsComplete = false
// to be modified only by consumer
@volatile private[this] var downstreamIsDone = false
private[this] val lock = new Semaphore(1)
@volatile private[this] var state = strategy.empty
@volatile private[this] var isPushLoopRunning = false
def onNext(elem: T): Ack = {
if (!upstreamIsComplete && !downstreamIsDone) {
lock.acquire()
val (maybeError, updatedState) =
try {
(None, strategy.add(state, elem))
} catch {
case NonFatal(error) ⇒ (Some(error), strategy.empty)
}
state = updatedState
maybeError match {
case None ⇒ {
pushToConsumer()
Continue
}
case Some(error) ⇒ {
if (!upstreamIsComplete && !downstreamIsDone) {
errorThrown = error
upstreamIsComplete = true
pushToConsumer()
}
Cancel
}
}
} else {
Cancel
}
}
def onError(ex: Throwable) = {
if (!upstreamIsComplete && !downstreamIsDone) {
errorThrown = ex
upstreamIsComplete = true
lock.acquire()
pushToConsumer()
}
}
def onComplete() = {
if (!upstreamIsComplete && !downstreamIsDone) {
upstreamIsComplete = true
lock.acquire()
pushToConsumer()
}
}
private[this] def pushToConsumer(): Unit = {
val shouldRunPushLoop = !isPushLoopRunning
if (shouldRunPushLoop) {
isPushLoopRunning = true
}
lock.release()
if (shouldRunPushLoop) {
scheduler.execute(() ⇒ pushLoop(0))
}
}
private[this] def rescheduled(): Unit = {
pushLoop(0)
}
@tailrec
private[this] def pushLoop(syncIndex: Int): Unit = {
if (!downstreamIsDone) {
lock.acquire()
val maybeItemToEmit = Try(strategy.get(state)) match {
case Success((Some(item), newState)) ⇒ {
state = newState
Some(item)
}
case Success((None, newState)) ⇒ {
state = newState
None
}
case Failure(error) ⇒ {
errorThrown = error
None
}
}
lock.release()
if (maybeItemToEmit.isEmpty) {
isPushLoopRunning = false
}
val hasError = errorThrown ne null
maybeItemToEmit match {
case Some(itemToEmit) ⇒ {
val ack = underlying onNext itemToEmit
val nextSyncIndex =
if (!ack.isCompleted) {
0
} else {
(syncIndex + 1) & batchSizeModulus
}
if (nextSyncIndex > 0) {
if (ack == Continue || ack.value.get == Continue.IsSuccess) {
// process next
pushLoop(nextSyncIndex)
} else if (ack == Cancel || ack.value.get == Cancel.IsSuccess) {
// ending loop
downstreamIsDone = true
isPushLoopRunning = false
} else if (ack.value.get.isFailure) {
// ending loop
downstreamIsDone = true
isPushLoopRunning = false
underlying.onError(ack.value.get.failed.get)
} else {
// never happens
downstreamIsDone = true
isPushLoopRunning = false
underlying.onError(new MatchError(ack.value.get.toString))
}
} else ack.onComplete {
case Continue.IsSuccess ⇒
// re-run loop (in different thread)
rescheduled()
case Cancel.IsSuccess ⇒
// ending loop
downstreamIsDone = true
isPushLoopRunning = false
case Failure(ex) ⇒
// ending loop
downstreamIsDone = true
isPushLoopRunning = false
underlying.onError(ex)
case other ⇒
// never happens, but to appease the Scala compiler
downstreamIsDone = true
isPushLoopRunning = false
underlying.onError(new MatchError(s"$other"))
}
}
case None ⇒ {
if (upstreamIsComplete || hasError) {
// Race-condition check, remnants of the same implementation in
// SimpleBufferedSubscriber (Monix 1.0), hopefully it is OK,
// perhaps redundant
downstreamIsDone = true
state = strategy.empty // for GC purposes
if (errorThrown ne null) {
underlying.onError(errorThrown)
} else {
underlying.onComplete()
}
}
}
}
}
}
}
/** This is the shared structure that describes the producing and consuming
* behavior of a particular instance of the `producerConsumer` operator.
*
* @tparam T The type of the input (observed, upstream, producer) elements
* @tparam R The type of the output (generated, downstream, consumer) elements
*/
abstract class ProducerConsumerStrategy[-T, +R] {
/**
* The type of the state internal to this particular
* ProducerConsumerStrategy
*/
type StateType
/**
* This method should generate a new state from an old state and an element
* that has just been produced. If this method throws an exception,
* `producerConsumer` will signal this to the downstream subscriber via
* `onError`
*
* @param state The old state
* @param element A newly produced element that is to be added to this shared structure
* @return New state, probably somehow reflecting that a new element has been added.
*/
def add(state: StateType, element: T): StateType
/**
* This method is used by the consumer to retrieve next element from the
* shared structure. It should return either an element and a new state
* (reflecting that the item has just been removed), or None if the old state
* already represent an empty structure
*
* If this method throws an exception, `producerConsumer` will signal this
* to the downstream subscriber via `onError`
*
* @param state The old state
* @return Some(item) retrieved from the old state if any, along with the new state
*/
def get(state: StateType): (Option[R], StateType)
def empty: StateType
}
/* ------------------------- the unit tests --------------------------- */
package info.td.common.monix
import java.util.Date
import info.td.common.monix.MonixHelpers._
import info.td.common.monix.testing.MonixTestHelpers
import monifu.reactive.{Ack, Observable}
import utest._
import utest.framework.TestSuite
import scala.concurrent.duration._
import scala.util.Random
object ProducerConsumerTest extends TestSuite {
val tests = TestSuite {
object GetByOneProducerConsumerStrategy extends ProducerConsumerStrategy[Int, Int] {
type StateType = List[Int]
override def add(elements: StateType, element: Int) = elements :+ element
override def get(elements: StateType) = {
elements match {
case element :: tail ⇒ (Some(element), tail)
case Nil ⇒ (None, Nil)
}
}
def empty = Nil
}
object GetAllProducerConsumerStrategy extends ProducerConsumerStrategy[Int, List[Int]] {
type StateType = List[Int]
override def add(elements: StateType, element: Int) = elements :+ element
override def get(elements: StateType) = {
if (elements.isEmpty) {
(None, Nil)
} else {
(Some(elements), Nil)
}
}
override def empty = Nil
}
val error: RuntimeException = new RuntimeException("Test error")
object ErrorWhenAddingProducerConsumerStrategy extends ProducerConsumerStrategy[Int, Any] {
type StateType = Any
override def add(elements: StateType, element: Int) = {
throw error
}
override def get(state: StateType) = (None, ())
override def empty: Unit = ()
}
object ErrorWhenGettingProducerConsumerStrategy extends ProducerConsumerStrategy[Int, Any] {
type StateType = Any
override def add(elements: StateType, element: Int) = ()
override def get(state: StateType) = {
throw error
}
override def empty: Unit = ()
}
"getByOne" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒
val timeSlice = 20.milli
val result: Vector[Int] = MonixTestHelpers
.delaysBetweenEmissions(Seq(100.milli, 100.milli, 200.milli))
.map(_.toInt)
.producerConsumer(GetByOneProducerConsumerStrategy)
.doWork(_ ⇒ schedulerProvider.advanceTimeBy(timeSlice * 3 + timeSlice / 5))
.toVectorWithAction(timeSlice * 15) {
schedulerProvider.advanceTimeBy(timeSlice * 15)
}(schedulerProvider.defaultScheduler)
val expectedResult = Vector(0, 1, 2)
assert(result == expectedResult)
}
"getAll" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒
val timeSlice = 20.milli
val result: Vector[List[Int]] =
Observable
.intervalAtFixedRate(timeSlice)
.map(_.toInt)
.take(10)
.producerConsumer(GetAllProducerConsumerStrategy)
.doWork(_ ⇒ schedulerProvider.advanceTimeBy(timeSlice * 3 + timeSlice / 5))
.toVectorWithAction(timeSlice * 15) {
schedulerProvider.advanceTimeBy(timeSlice * 15)
}(schedulerProvider.defaultScheduler)
assert(result.size == 4)
assert(result(0) == List(0))
assert(result(1) == List(1, 2, 3))
assert(result(2) == List(4, 5, 6))
assert(result(3) == List(7, 8, 9))
}
"getAll many elements slow subscriber" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒
val timeSlice = 2
val numberOfElements = 1000
val r = new Random(new Date().getTime)
val xsList =
MonixTestHelpers
.delaysBetweenEmissions(Seq.tabulate(numberOfElements)(_ ⇒ r.nextInt(timeSlice).milli))
.map(_.toInt)
.producerConsumer(GetAllProducerConsumerStrategy)
.doWork((_, schedulerProvider.advanceTimeBy(r.nextInt(timeSlice * 3).milli)))
.toVectorWithAction((numberOfElements * timeSlice).milli) {
schedulerProvider.advanceTimeBy(100.milli)
}(schedulerProvider.defaultScheduler)
val flattenedResult = xsList.flatten
val expectedResult = (0 until numberOfElements).toVector
assert(flattenedResult == expectedResult)
}
"getAll many elements fast subscriber" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒
val timeSlice = 2
val numberOfElements = 1000
val r = new Random(new Date().getTime)
val xsList =
MonixTestHelpers
.delaysBetweenEmissions(Seq.tabulate(numberOfElements)(_ ⇒ r.nextInt(timeSlice).milli))
.map(_.toInt)
.producerConsumer(GetAllProducerConsumerStrategy)
.toVectorWithAction((numberOfElements * timeSlice).milli) {
schedulerProvider.advanceTimeBy((numberOfElements * timeSlice).milli)
}(schedulerProvider.defaultScheduler)
val flattenedResult = xsList.flatten
val expectedResult = (0 until numberOfElements).toVector
assert(flattenedResult == expectedResult)
}
def assertErrorEmitted[T](o: Observable[T], time: FiniteDuration)(schedulerProvider: SchedulerProvider): Unit = {
var errorThrown: Throwable = null
o.subscribe(_ ⇒ Ack.Continue, error ⇒ errorThrown = error)(schedulerProvider.defaultScheduler)
schedulerProvider.advanceTimeBy(time)
assert(errorThrown == error)
}
"error producing ProducerConsumer" - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒
assertErrorEmitted(Observable
.from(1, 2, 3)
.producerConsumer(ErrorWhenAddingProducerConsumerStrategy),
100.milli
)(schedulerProvider)
}
"get throwing an error is handled correctly " - MonixTestHelpers.testWithSchedulers { schedulerProvider ⇒
assertErrorEmitted(Observable
.from(1, 2, 3)
.producerConsumer(ErrorWhenGettingProducerConsumerStrategy),
100.milli
)(schedulerProvider)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment