Skip to content

Instantly share code, notes, and snippets.

@Tvaroh
Created December 21, 2017 14:42
Show Gist options
  • Save Tvaroh/8dad4aaaf674a23aa106c3a8ce0d2786 to your computer and use it in GitHub Desktop.
Save Tvaroh/8dad4aaaf674a23aa106c3a8ce0d2786 to your computer and use it in GitHub Desktop.
Asynchronous "iterator" to Reactive Streams publisher
import java.util.concurrent.atomic.AtomicReference
import java.util.function.UnaryOperator
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.control.NonFatal
class AsyncPublisher[T](next: () => Future[Option[T]],
done: () => Unit)
extends Publisher[T] {
import AsyncPublisher._
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe {
new Subscription {
override def request(n: Long): Unit = {
val running = state.getAndUpdate(incrementDemand(n)).isRunning
if (!running) {
// protected block running asynchronously, all further requests will update the demand and return
def loop(): Future[Unit] = {
val demand = state.getAndUpdate(decrementDemand).demand
if (demand > 0)
next().flatMap {
case Some(t) =>
subscriber.onNext(t)
loop()
case None =>
subscriber.onComplete()
done()
FutureUnit
}
else
if (state.compareAndSet(ZeroDemandRunningState, StoppedState)) FutureUnit
else loop() // demand changed concurrently
}
loop().recover { case NonFatal(ex) =>
subscriber.onError(ex)
done()
}
()
}
}
override def cancel(): Unit = {
done()
}
private val state: AtomicReference[State] = new AtomicReference(StoppedState)
}
}
}
object AsyncPublisher {
private sealed trait State {
def demand: Long
def isRunning: Boolean
}
private[this] object State {
case object Stopped extends State {
override def demand: Long = 0L
override def isRunning: Boolean = false
}
case class Running(override val demand: Long) extends State {
override def isRunning: Boolean = true
}
object Running {
val Empty = Running(0L)
}
}
private val ZeroDemandRunningState: State = RunningStatePool(0)
private val StoppedState: State = State.Stopped
private val decrementDemand: UnaryOperator[State] =
new UnaryOperator[State] { override def apply(s: State): State = runningState(Math.max(s.demand - 1, 0L)) }
private def incrementDemand(n: Long): UnaryOperator[State] =
if (n <= IncrementDemandPool.length) IncrementDemandPool(n.toInt - 1)
else mkIncrementDemand(n)
private[this] lazy val IncrementDemandPool: Array[UnaryOperator[State]] =
(1L to 16L).map(mkIncrementDemand).toArray
private[this] def mkIncrementDemand(n: Long): UnaryOperator[State] =
new UnaryOperator[State] {
override def apply(s: State): State = runningState(s.demand + n)
}
private[this] lazy val RunningStatePool: Array[State] =
(0L to 16L).map(State.Running(_)).toArray
private[this] def runningState(demand: Long): State =
if (demand < RunningStatePool.length) RunningStatePool(demand.toInt)
else State.Running(demand)
private val FutureUnit = Future.successful(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment