Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

An attempt to implement InterleaveObservable in monix with the similar behavior in scalaz-stream/fs2

import monix.execution._
import monix.execution.cancelables._
import monix.execution.Ack._
import monix.reactive._
import monix.reactive.observers._

import scala.concurrent._

final class InterleaveObservable[A](obsA1: Observable[A], obsA2: Observable[A]) extends Observable[A] { self =>
  def unsafeSubscribeFn(out: Subscriber[A]): Cancelable = {
    import out.scheduler

    // MUST BE synchronized by `self`
    var isDone = false
    // MUST BE synchronized by `self`
    var lastAck = Continue : Future[Ack]
    // MUST BE synchronized by `self`
    // select indicates which child observable that the `onNext` callback should listen to at the moment
    // in this case, select alternates between 1 (listen to obsA1) and -1 (listen to obsA2)
    var select = 0
    // MUST BE synchronized by `self`
    var continueA1 = Promise[Ack]() // this essentially serves as a lock for obsA2 when `select` is not assigned to it
    var continueA2 = Promise[Ack]() // this essentially serves as a lock for obsA1 when `select` is not assigned to it
    // MUST BE synchronized by `self`
    var completedCount = 0

    def rawOnNext(a: A): Future[Ack] = {
      try out.onNext(a)
      finally {
        if (completedCount == 0) select *= -1
      }
    }

    def signalOnNextA1(a: A): Future[Ack] = {
      lastAck = lastAck match {
        case Continue  rawOnNext(a)
        case Stop  if (select == 0) rawOnNext(a) else Stop
        case async 
          async.flatMap {
            case Continue  self.synchronized(rawOnNext(a))
            case Stop      self.synchronized(if (select == 0) rawOnNext(a) else Stop)
          }
      }

      continueA1.tryCompleteWith(lastAck)
      continueA1 = Promise[Ack]()
      lastAck
    }

    def signalOnNextA2(a: A): Future[Ack] = {
      lastAck = lastAck match {
        case Continue  rawOnNext(a)
        case Stop  if (select == 0) rawOnNext(a) else Stop
        case async 
          async.flatMap {
            case Continue  self.synchronized(rawOnNext(a))
            case Stop      self.synchronized(if (select == 0) rawOnNext(a) else Stop)
          }
      }

      continueA2.tryCompleteWith(lastAck)
      continueA2 = Promise[Ack]()
      lastAck
    }

    def signalOnError(ex: Throwable): Unit = self.synchronized {
      if (!isDone) {
        isDone = true
        out.onError(ex)
        lastAck = Stop
        continueA1.tryCompleteWith(Stop)
        continueA2.tryCompleteWith(Stop)
      }
    }

    def signalOnComplete(): Unit = {
      @inline def rawOnComplete(): Unit =
        if (!isDone) {
          isDone = true
          out.onComplete()
        }

      self.synchronized  {
        val shouldComplete = !isDone && (
          {
            completedCount += 1
            completedCount == 2
          })

        if (shouldComplete) {
          lastAck match {
            case Continue  rawOnComplete()
            case Stop      () // do nothing
            case async    
              async.onComplete {
                case Success(Continue) 
                  self.synchronized(rawOnComplete())
                case _                 
                  () // do nothing
              }
          }

          lastAck = Stop
        }
      }
    }

    val composite = CompositeCancelable()

    composite += obsA1.unsafeSubscribeFn(new Subscriber[A] {
      implicit val scheduler = out.scheduler

      def onNext(elem: A): Future[Ack] = self.synchronized {
        if (isDone) Stop else {

          hasElemA1 = true
          if (select == 0) select = 1

          if (select == 1)
            signalOnNextA1(elem)
          else
          // here essentially means waiting until obsA2 is done with its work
          // when that happens, before passing the `Continue` signal to the upstream observable,
          // immediately signal `onNext` of the out Observable to emit the elem since it is already available
            continueA2.future.flatMap {
              case Continue 
                self.synchronized { signalOnNextA1(elem) }
              case Stop     
                self.synchronized { if (select == 0) signalOnNextA1(elem) else Stop }
            }
        }
      }

      def onError(ex: Throwable): Unit =
        signalOnError(ex)
      def onComplete(): Unit = self.synchronized {
        signalOnComplete()
        continueA1.tryCompleteWith(Stop)
        select = 0
      }
    })

    composite += obsA2.unsafeSubscribeFn(new Subscriber[A] {
      implicit val scheduler = out.scheduler

      def onNext(elem: A): Future[Ack] = self.synchronized {
        if (isDone) Stop else {
          hasElemA2 = true
          if (select == 0) select = -1

          if (select == -1)
            signalOnNextA2(elem)
          else
            // here essentially means waiting until obsA1 is done with its work
            // when that happens, before passing the `Continue` signal to the upstream observable,
            // immediately signal `onNext` of the out Observable to emit the elem since it is already available
            continueA1.future.flatMap {
              case Continue 
                self.synchronized { signalOnNextA2(elem) }
              case Stop     
                self.synchronized { if (select == 0) signalOnNextA2(elem) else Stop }
            }
        }
      }

      def onError(ex: Throwable): Unit =
        signalOnError(ex)
      def onComplete(): Unit = self.synchronized {
        signalOnComplete()
        continueA2.tryCompleteWith(Stop)
        select = 0
      }
    })

    composite
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.