Skip to content

Instantly share code, notes, and snippets.

@rnkoaa
Created January 11, 2017 18:12
Show Gist options
  • Save rnkoaa/79c633ce019d1c3fa9b9306b2b05374c to your computer and use it in GitHub Desktop.
Save rnkoaa/79c633ce019d1c3fa9b9306b2b05374c to your computer and use it in GitHub Desktop.
Convert a listenable future to an rxjava-2 flowable or single
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.reactivex.*;
/**
* Created on 1/11/2017.
*/
public class FlowableUtils {
public static <T> Single<T> fromFuture(ListenableFuture<? extends T> future) {
/* Single.defer(() -> {
Single.def
})*/
return Single.defer(() ->
Single.create((SingleOnSubscribe<T>) e -> {
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
e.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
e.onError(t);
}
});
}));
}
public static <T> Flowable<T> flowableFromFuture(ListenableFuture<? extends T> future) {
return Flowable.defer(() -> Flowable.create((FlowableOnSubscribe<T>) e -> {
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
e.onNext(result);
e.onComplete();
}
@Override
public void onFailure(Throwable t) {
e.onError(t);
}
});
}, BackpressureStrategy.BUFFER));
}
public static <T> Flowable<T> flowableFromFuture(ListenableFuture<? extends T> future, Scheduler scheduler) {
return Flowable.defer(() -> Flowable.create((FlowableOnSubscribe<T>) e -> {
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
e.onNext(result);
e.onComplete();
}
@Override
public void onFailure(Throwable t) {
e.onError(t);
}
});
}, BackpressureStrategy.BUFFER))
.subscribeOn(scheduler);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment