Skip to content

Instantly share code, notes, and snippets.

@heliocentrist
Created December 8, 2016 14:29
Show Gist options
  • Save heliocentrist/dd204493cb9abe97c0fa9db1b491ba0d to your computer and use it in GitHub Desktop.
Save heliocentrist/dd204493cb9abe97c0fa9db1b491ba0d to your computer and use it in GitHub Desktop.
val sumConsumer: Consumer[Int,Long] =
new Consumer[Int,Long] {
def createSubscriber(cb: Callback[Long], s: Scheduler) = {
val out = new Subscriber.Sync[Int] {
implicit val scheduler = s
private var sum = 0L
def onNext(elem: Int): Continue = {
sum += elem
Thread.sleep(20000)
Continue
}
def onComplete(): Unit = {
// We are done so we can signal the final result
cb.onSuccess(sum)
}
def onError(ex: Throwable): Unit = {
// Error happened, so we signal the error
cb.onError(ex)
}
}
// Returning a tuple of our subscriber and a dummy
// AssignableCancelable because we don't indent to use it
(out, AssignableCancelable.dummy)
}
}
val source = Observable.interval(1 second)
val task = source.take(5)
.map(x => 1)
.timeoutOnSlowDownstream(1 second)
.runWith(sumConsumer)
Await.ready(task.runAsync, 10 seconds).value.get match {
case Failure(ex: DownstreamTimeoutException) => fail("Yay, got timeout!")
case Failure(e) => fail("Wrong exception type: " + e)
case Success(v) => fail("Expected a timeout exception, got: " + v)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment