Skip to content

Instantly share code, notes, and snippets.

@Tvaroh
Last active December 7, 2017 13:15
Show Gist options
  • Save Tvaroh/457db0e2c7ad7014d800552dd5cb7017 to your computer and use it in GitHub Desktop.
Save Tvaroh/457db0e2c7ad7014d800552dd5cb7017 to your computer and use it in GitHub Desktop.
Iterator to Reactive Streams publisher
import org.reactivestreams.{Publisher, Subscriber, Subscription}
import scala.util.control.NonFatal
/** Iterator-based Reactive Streams publisher.
* @param iterator iterator
* @param onDone done callback */
class CloseableIteratorPublisher[T](iterator: Iterator[T], onDone: () => Unit)
extends Publisher[T] {
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe(new Subscription {
override def request(n: Long): Unit = {
var requested = n
try {
while (requested > 0) {
if (iterator.hasNext) {
subscriber.onNext(iterator.next())
} else {
subscriber.onComplete()
onDone()
requested = 0
}
requested -= 1
}
} catch {
case NonFatal(ex) =>
subscriber.onError(ex)
onDone()
}
}
override def cancel(): Unit = {
onDone()
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment