Skip to content

Instantly share code, notes, and snippets.

@danielesegato
Last active February 4, 2020 15:03
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save danielesegato/c9861bc813982f248f2244ed2563ef89 to your computer and use it in GitHub Desktop.
Save danielesegato/c9861bc813982f248f2244ed2563ef89 to your computer and use it in GitHub Desktop.
RxJava: Creating Observable from legacy asynchronous API using callbacks

About the gist

EDIT: This has become soon obsolete when Observable.fromAsync() has been released in RxJava. An issue from JakeWarthon here: ReactiveX/RxJava#4177 generated a pull request which is being implemented / discussed here: ReactiveX/RxJava#4179 . See below on how to use it.

I'm learning RxJava. Many Rx-libraries out there are using Obvservable.create() with an inline OnSubscribe implementation to wrap legacy APIs, like this one for the Android GoogleMap API:

class MapFragmentMapReadyOnSubscribe implements Observable.OnSubscribe<GoogleMap> {
  final MapFragment fragment;

  MapFragmentMapReadyOnSubscribe(MapFragment fragment) {
    this.fragment = fragment;
  }

  @Override public void call(final Subscriber<? super GoogleMap> subscriber) {
    MainThreadSubscription.verifyMainThread();

    OnMapReadyCallback callback = new OnMapReadyCallback() {
      @Override public void onMapReady(GoogleMap googleMap) {
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(googleMap);
        }
      }
    };

    fragment.getMapAsync(callback);
  }
}

According to Dávid Karnok this is a very bad practice, quoting him: "What about unsubscription and backpressure? No amount of operators can fix that for this source".

I wrote to him and following his input I tried to create something generic to help wrapping legacy API by writing something that take into account back pressure. I'm not sure at all this is ACTUALLY better, it's a work in progress.

EDIT: this is is the new way you are supposed to do that:

class MyActivity extends AppCompatActivity {

  @Override
  public Observable<GoogleMap> getGoogleMapObservale(GoogleMapFragment mapFragment) {
    return Observable<GoogleMap> = Observable.fromAsync(emitter -> {
      OnMapReadyCallback callback = new OnMapReadyCallback() {
        @Override
        public void onMapReady(GoogleMap googleMap) {
          emitter.onNext(googleMap);
          emitter.onCompleted();
        }
      };
  
      mapFragment.getMapAsync(callback);
    },  AsyncEmitter.BackpressureMode.BUFFER);
  }
}

Old gist

This snipped is to be used for those API that return a SINGLE value or fail ONCE. More or less like a Promise. It does not support multiple returned values or both values and errors. It does, however, support APIs that return you something that can be used to cancel the request.

Usage

Copy the class into your code and use it to create your Observables.

To create the observable from the Map API above you would do:

@NonNull
private Observable<GoogleMap> mapObservable(MapFragment mapFragment) {
    return RxWrapCallbackAPI.wrapAPI(
            (successCallback, failCallback) -> mapFragment.getMapAsync(successCallback::onSuccess));
}

In general suppose you have an API like this one:

public interface API<DATA> {
    void nonCancelableMethod(Callback<DATA> callback) throws Exception;
    Cancelable cancelableMethod(Callback<DATA> callback) throws Exception;

    // The callback
    interface Callback<DATA> {
        void onDataReady(DATA data, Exception error);
        void onDataFail(Exception error);
    }

    // this allow you to cancel the request
    interface Cancelable {
        void cancel() throws Exception; // note this can throw a Checked Exception
    }
}

When you need to wrap an API that has no way of canceling it's execution:

// Non Cancelable API wrapping
Observable<Data> nonCancelableObservable = RxWrapCallbackAPI.wrapAPI(
        // successCallback and failCallback are provided by the RxWrapCallbackAPI
        // Here you call your API as you normally would,
        // it will be executed when you subscribe to the observable
        (successCallback, failCallback) -> api.nonCancelableMethod(new Callback<Data>() {
            @Override
            public void onDataReady(Data data, Exception error) {
                // this is your API callback
                if (error == null) {
                    // we have a non error result, pass it to the successCallback
                    successCallback.onSuccess(data);
                } else {
                    // we have an error :( pass it to the error callback
                    failCallback.onFailed(error);
                }
            }

            @Override
            public void onDataFail(Exception error) {
                // another method of your API, it's an error, so use the failCallback
                failCallback.onFailed(error);
            }
        })
);

And when your legacy API do expose some method to cancel it it can be automatically used when there are no more subscriptions to your Observable, just use the wrapAPICancelable() method in that case:

// Cancelable API wrapping
Observable<Data> cancelableObservable = RxWrapCallbackAPI.wrapAPICancelable(
        // successCallback and failCallback are provided by the RxWrapCallbackAPI
        // Here you call your API as you normally would,
        // it will be executed when you subscribe to the observable
        // the only difference is that your API here return something to cancel the execution
        (successCallback, failCallback) -> api.cancelableMethod(new Callback<Data>() {
            @Override
            public void onDataReady(Data data, Exception error) {
                // this is your API callback
                if (error == null) {
                    // we have a non error result, pass it to the successCallback
                    successCallback.onSuccess(data);
                } else {
                    // we have an error :( pass it to the error callback
                    failCallback.onFailed(error);
                }
            }

            @Override
            public void onDataFail(Exception error) {
                // another method of your API, it's an error, so use the failCallback
                failCallback.onFailed(error);
            }
        }),
        // cancelable here is the object your API returned
        // you just need to do whatever your API require to cancel the execution
        cancelable -> () -> cancelable.cancel()
);

This should be flexible on any asynchrouns API with a callback.

Not supported:

  • api providing multiple values via different callbacks or at different times
  • api that may not return nor fail
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import rx.Observable;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.internal.producers.SingleProducer;
import rx.subscriptions.Subscriptions;
/**
*
*/
public class RxWrapCallbackAPI {
static class ToCallbackSubscriber<DATA, CANCELABLE> implements Observable.OnSubscribe<DATA> {
private final CancelableDataFactory<? extends DATA, CANCELABLE> cancelableDataFactory;
private final CancelableMapper<CANCELABLE> cancelableMapper;
public ToCallbackSubscriber(CancelableDataFactory<? extends DATA, CANCELABLE> cancelableDataFactory, CancelableMapper<CANCELABLE> cancelableMapper) {
this.cancelableDataFactory = cancelableDataFactory;
this.cancelableMapper = cancelableMapper;
}
@Override
public void call(Subscriber<? super DATA> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
try {
CANCELABLE cancelable = cancelableDataFactory.asyncCall(
data -> {
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.setProducer(new SingleProducer<>(subscriber, data));
},
error -> {
if (subscriber.isUnsubscribed()) {
return;
}
Exceptions.throwOrReport(error, subscriber);
});
Closeable closeable = cancelableMapper.mapCancelable(cancelable);
// cancel on unSubscription
subscriber.add(Subscriptions.create(() -> {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
}
}
}));
} catch (Exception e) {
Exceptions.throwOrReport(e, subscriber);
}
}
}
public interface SuccessCallback<DATA> {
void onSuccess(@Nullable DATA data);
}
public interface FailCallback {
void onFailed(@Nullable Exception error);
}
public interface Closeable {
void close() throws Exception;
}
public interface CancelableMapper<CANCELABLE> {
@NonNull
Closeable mapCancelable(CANCELABLE cancelable);
}
public interface NonCancelableDataFactory<DATA> {
void asyncCall(@NonNull SuccessCallback<DATA> successCallback, @NonNull FailCallback failCallback) throws Exception;
}
public interface CancelableDataFactory<DATA, CANCELABLE> {
CANCELABLE asyncCall(@NonNull SuccessCallback<DATA> successCallback, @NonNull FailCallback failCallback) throws Exception;
}
private static final CancelableMapper<Void> DUMMY_CANCELABLE = new CancelableMapper<Void>() {
private final Closeable DUMMY_CLOSABLE = () -> {
};
@NonNull
@Override
public Closeable mapCancelable(Void ignored) {
return DUMMY_CLOSABLE;
}
};
static <DATA, CANCELABLE> Observable.OnSubscribe<DATA> toCancelableAsyncSubscriber(
CancelableDataFactory<? extends DATA, CANCELABLE> cancelableDataFactory,
CancelableMapper<CANCELABLE> cancelableMapper) {
return new ToCallbackSubscriber<>(cancelableDataFactory, cancelableMapper);
}
public static <DATA> Observable<DATA> wrapAPI(
NonCancelableDataFactory<DATA> nonCancelableDataFactory) {
CancelableDataFactory<? extends DATA, Void> fakeCancellable = (successCallback, failCallback) -> {
nonCancelableDataFactory.asyncCall(successCallback, failCallback);
return null;
};
return Observable.create(toCancelableAsyncSubscriber(fakeCancellable, DUMMY_CANCELABLE));
}
public static <DATA, CANCELABLE> Observable<DATA> wrapAPICancelable(
CancelableDataFactory<? extends DATA, CANCELABLE> cancelableDataFactory,
CancelableMapper<CANCELABLE> cancelableMapper) {
return Observable.create(toCancelableAsyncSubscriber(cancelableDataFactory, cancelableMapper));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment