Skip to content

Instantly share code, notes, and snippets.

@xgrommx
Forked from gburgett/ObservableExtensions.scala
Created December 31, 2013 19:14
Show Gist options
  • Save xgrommx/8201022 to your computer and use it in GitHub Desktop.
Save xgrommx/8201022 to your computer and use it in GitHub Desktop.
package com.SimplyTrackable.logviewer.utils
import scala.concurrent.Future
import rx.Observable
import rx.Observer
import scala.concurrent.Promise
import scala.collection.generic.CanBuildFrom
import scala.concurrent.ExecutionContext
object ObservableExtensions {
class RichFutureTraversable[A, M[_] <: TraversableOnce[_]](wrapped: M[Future[A]]){
/**
* Converts the sequence of future values into an observable.
*/
def toObservable(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Observable[A] = {
wrapped
}
}
implicit def traversableToRichFutureTraversable[A, M[_] <: TraversableOnce[_]](in: M[Future[A]]) = new RichFutureTraversable(in)
class RichFutureObservable[A](wrapped: Future[Observable[A]]) {
/**
* Converts the future observable into an observable
*/
def flatten(implicit executor: ExecutionContext): Observable[A] = {
Observable.create( (observer: Observer[A]) => {
wrapped.onSuccess( { case observable => observable.subscribe(observer) } )
wrapped.onFailure( {
case ex:Exception => observer.onError(ex)
case th:Throwable => observer.onError(new Exception("Uncaught throwable in future", th))
} )
} )
}
}
implicit def futureObservableToRichFutureObservable[A](in: Future[Observable[A]]) = new RichFutureObservable[A](in)
/**
* Implicitly transforms a sequence of futures into an observable. The observable will maintain the same ordering as the input sequence, and will push
* values as the futures complete. This is a non-blocking operation.
*
* Note that the result of any individual future in the input sequence is only pushed after all previous futures have completed, so futures later in the sequence
* which complete early may "sit in the queue" for some time until earlier futures complete. If an error occurs in any one future, the "onError" method will
* be called for that one exception; any futures earlier in the sequence will have already been pushed, and no futures later in the sequence will be pushed.
*/
implicit def futures2Observable[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext) : Observable[A] = {
Observable.create( (obs: Observer[A]) => {
val seq = in.foldLeft(Promise.successful(cbf(in)).future) {
(fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield {
//push the value to the observable
obs.onNext(a)
//add to the builder so the next future can wait on this one
(r += a)
}
} map (_.result)
seq.onFailure( {
case ex:Exception => obs.onError(ex)
case th:Throwable => obs.onError(new Exception("Uncaught throwable in future", th))
} )
seq.onSuccess( { case v => obs.onCompleted() } )
} )
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment