Skip to content

Instantly share code, notes, and snippets.

@gburgett
Created July 29, 2013 18:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save gburgett/6106724 to your computer and use it in GitHub Desktop.
Save gburgett/6106724 to your computer and use it in GitHub Desktop.
Pimp My Library style extensions for converting between scala.concurrent.Future and rxjava's Observables
package com.SimplyTrackable.logviewer.utils
import scala.concurrent.Future
import rx.Observable
import rx.Observer
import scala.concurrent.Promise
import scala.collection.generic.CanBuildFrom
import scala.concurrent.ExecutionContext
object ObservableExtensions {
class RichFutureTraversable[A, M[_] <: TraversableOnce[_]](wrapped: M[Future[A]]){
/**
* Converts the sequence of future values into an observable.
*/
def toObservable(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Observable[A] = {
wrapped
}
}
implicit def traversableToRichFutureTraversable[A, M[_] <: TraversableOnce[_]](in: M[Future[A]]) = new RichFutureTraversable(in)
class RichFutureObservable[A](wrapped: Future[Observable[A]]) {
/**
* Converts the future observable into an observable
*/
def flatten(implicit executor: ExecutionContext): Observable[A] = {
Observable.create( (observer: Observer[A]) => {
wrapped.onSuccess( { case observable => observable.subscribe(observer) } )
wrapped.onFailure( {
case ex:Exception => observer.onError(ex)
case th:Throwable => observer.onError(new Exception("Uncaught throwable in future", th))
} )
} )
}
}
implicit def futureObservableToRichFutureObservable[A](in: Future[Observable[A]]) = new RichFutureObservable[A](in)
/**
* Implicitly transforms a sequence of futures into an observable. The observable will maintain the same ordering as the input sequence, and will push
* values as the futures complete. This is a non-blocking operation.
*
* Note that the result of any individual future in the input sequence is only pushed after all previous futures have completed, so futures later in the sequence
* which complete early may "sit in the queue" for some time until earlier futures complete. If an error occurs in any one future, the "onError" method will
* be called for that one exception; any futures earlier in the sequence will have already been pushed, and no futures later in the sequence will be pushed.
*/
implicit def futures2Observable[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext) : Observable[A] = {
Observable.create( (obs: Observer[A]) => {
val seq = in.foldLeft(Promise.successful(cbf(in)).future) {
(fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield {
//push the value to the observable
obs.onNext(a)
//add to the builder so the next future can wait on this one
(r += a)
}
} map (_.result)
seq.onFailure( {
case ex:Exception => obs.onError(ex)
case th:Throwable => obs.onError(new Exception("Uncaught throwable in future", th))
} )
seq.onSuccess( { case v => obs.onCompleted() } )
} )
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment