Created
December 21, 2017 14:42
-
-
Save Tvaroh/8dad4aaaf674a23aa106c3a8ce0d2786 to your computer and use it in GitHub Desktop.
Asynchronous "iterator" to Reactive Streams publisher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicReference | |
import java.util.function.UnaryOperator | |
import org.reactivestreams.{Publisher, Subscriber, Subscription} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.Future | |
import scala.util.control.NonFatal | |
class AsyncPublisher[T](next: () => Future[Option[T]], | |
done: () => Unit) | |
extends Publisher[T] { | |
import AsyncPublisher._ | |
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = | |
subscriber.onSubscribe { | |
new Subscription { | |
override def request(n: Long): Unit = { | |
val running = state.getAndUpdate(incrementDemand(n)).isRunning | |
if (!running) { | |
// protected block running asynchronously, all further requests will update the demand and return | |
def loop(): Future[Unit] = { | |
val demand = state.getAndUpdate(decrementDemand).demand | |
if (demand > 0) | |
next().flatMap { | |
case Some(t) => | |
subscriber.onNext(t) | |
loop() | |
case None => | |
subscriber.onComplete() | |
done() | |
FutureUnit | |
} | |
else | |
if (state.compareAndSet(ZeroDemandRunningState, StoppedState)) FutureUnit | |
else loop() // demand changed concurrently | |
} | |
loop().recover { case NonFatal(ex) => | |
subscriber.onError(ex) | |
done() | |
} | |
() | |
} | |
} | |
override def cancel(): Unit = { | |
done() | |
} | |
private val state: AtomicReference[State] = new AtomicReference(StoppedState) | |
} | |
} | |
} | |
object AsyncPublisher { | |
private sealed trait State { | |
def demand: Long | |
def isRunning: Boolean | |
} | |
private[this] object State { | |
case object Stopped extends State { | |
override def demand: Long = 0L | |
override def isRunning: Boolean = false | |
} | |
case class Running(override val demand: Long) extends State { | |
override def isRunning: Boolean = true | |
} | |
object Running { | |
val Empty = Running(0L) | |
} | |
} | |
private val ZeroDemandRunningState: State = RunningStatePool(0) | |
private val StoppedState: State = State.Stopped | |
private val decrementDemand: UnaryOperator[State] = | |
new UnaryOperator[State] { override def apply(s: State): State = runningState(Math.max(s.demand - 1, 0L)) } | |
private def incrementDemand(n: Long): UnaryOperator[State] = | |
if (n <= IncrementDemandPool.length) IncrementDemandPool(n.toInt - 1) | |
else mkIncrementDemand(n) | |
private[this] lazy val IncrementDemandPool: Array[UnaryOperator[State]] = | |
(1L to 16L).map(mkIncrementDemand).toArray | |
private[this] def mkIncrementDemand(n: Long): UnaryOperator[State] = | |
new UnaryOperator[State] { | |
override def apply(s: State): State = runningState(s.demand + n) | |
} | |
private[this] lazy val RunningStatePool: Array[State] = | |
(0L to 16L).map(State.Running(_)).toArray | |
private[this] def runningState(demand: Long): State = | |
if (demand < RunningStatePool.length) RunningStatePool(demand.toInt) | |
else State.Running(demand) | |
private val FutureUnit = Future.successful(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment