Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active March 12, 2017 06:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quelgar/0e4747ccb05a531d1b5f05c341c1cc66 to your computer and use it in GitHub Desktop.
Save quelgar/0e4747ccb05a531d1b5f05c341c1cc66 to your computer and use it in GitHub Desktop.
import monix.reactive._
import monix.execution._
import Ack.{Stop, Continue}
import scala.util.control.NonFatal
import scala.concurrent._
import scala.concurrent.duration._
def feed[A](in: Iterator[A], out: Observer[A])
(implicit s: Scheduler): Future[Ack] = {
// Indicates whether errors that happen inside the
// logic below should be streamed downstream with
// onError, or whether we aren't allowed because of
// the grammar. Basically we need to differentiate
// between errors triggered by our data source, the
// Iterator, and errors triggered by our Observer,
// which isn't allowed to triggered exceptions.
var streamErrors = true
try {
// Iterator protocol, we need to ask if it hasNext
if (!in.hasNext) {
// From this point on, we aren't allowed to call onError
// because it can break the contract
streamErrors = false
// Signaling the end of the stream, then we are done
out.onComplete()
Thread.dumpStack()
Stop
} else {
// Iterator protocol, we get a next element
val next = in.next()
// From this point on, we aren't allowed to call onError
// because it can break the contract
streamErrors = false
// Signaling onNext, then back-pressuring
out.onNext(next).flatMap {
case Continue =>
// We got permission, go next
feed(in, out)(s)
case Stop =>
// Nothing else to do, stop the loop
Stop
}
}
} catch {
case NonFatal(ex) =>
// The Iterator triggered the error, so stream it
if (streamErrors)
out.onError(ex)
else // The Observer triggered the error, so log it
s.reportFailure(ex)
// Nothing else to do
Stop
}
}
import monix.execution.Scheduler.Implicits.global
// stack overflow: (fixed in 2.2.3, see https://github.com/monix/monix/issues/330)
Await.result(feed(Iterator.range(1, 10000), Observer.empty), 2.minutes)
// works:
//Await.result(Observer.feed(Observer.empty, 1 to 10000), 2.minutes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment