Skip to content

Instantly share code, notes, and snippets.

@akozhemiakin
Created March 22, 2018 13:38
Show Gist options
  • Save akozhemiakin/4934a5ab463f7d2104c3bb35dd88a2a8 to your computer and use it in GitHub Desktop.
Save akozhemiakin/4934a5ab463f7d2104c3bb35dd88a2a8 to your computer and use it in GitHub Desktop.
import monix.execution.Ack.{Continue, Stop}
import monix.execution.{Ack, Scheduler}
import monix.reactive.Observable.Operator
import monix.reactive.observers.Subscriber
import scala.concurrent.{Future, Promise}
class ElasticBufferOperator[A](size: Int) extends Operator[A, A] {
override def apply(out: Subscriber[A]): Subscriber[A] = new Subscriber[A] {
private[this] var buffer: List[A] = Nil
private[this] var np: Option[Promise[A]] = None
private[this] var bf: Option[Promise[Ack]] = None
private[this] var stopped: Boolean = false
private[this] var isDone = false
override implicit val scheduler: Scheduler = out.scheduler
feed()
override def onError(ex: Throwable): Unit = {
if (!isDone) {
isDone = true
buffer = null
out.onError(ex)
}
}
override def onComplete(): Unit = {
if (!isDone) {
isDone = true
}
}
override def onNext(elem: A): Future[Ack] = {
if (stopped) Stop
np match {
case Some(x) =>
np = None
x.success(elem)
case None => buffer = buffer :+ elem
}
if (buffer.size < size) Continue else {
val p = Promise[Ack]
bf = Some(p)
p.future
}
}
private[this] def getNext: Future[A] = buffer.headOption.fold{
val p = Promise[A]
np = Some(p)
p.future
}{v =>
buffer = buffer.tail
if (buffer.isEmpty && isDone) {
buffer = null
}
bf.foreach{x =>
bf = None
x.success(Continue)
}
Future(v)
}
private[this] def feed(): Unit = {
getNext.flatMap(v => out.onNext(v)).foreach {
case Continue => feed()
case Stop => stopped = true
}
}
}
}
object syntax {
implicit class ObservableOps[A](a: Observable[A]) {
final def bufferElastic(size: Int): Observable[A] =
a.liftByOperator(new ElasticBufferOperator[A](size))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment