Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Observable to F.Promise adapter
package play;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import play.libs.F;
import play.libs.ws.WS;
import play.libs.ws.WSResponse;
import play.mvc.Http;
import java.util.List;
/**
* Created by mati on 26/07/2014.
*/
public class MakeModelRefDataClient {
private final String baseUrl;
private final Gson gson;
public MakeModelRefDataClient(final String baseUrl, final Gson gson) {
this.baseUrl = baseUrl;
this.gson = gson;
}
public static class Makes {
public List<Make> makes;
}
public static class Models {
public List<Model> models;
}
public static class Make {
@SerializedName("n")
public String name;
@SerializedName("i")
public Integer id;
}
public static class Model {
@SerializedName("n")
public String name;
@SerializedName("i")
public Integer id;
@SerializedName("p")
public Integer parent;
}
public F.Promise<Makes> makes(final String category) {
return WS.url(String.format("%s/r/makes/%s", baseUrl, category)).get()
.map(response -> toDomainObject(Makes.class, response));
}
public F.Promise<Models> models(final Integer makeId) {
return WS.url(String.format("%s/r/models/%d", baseUrl, makeId)).get()
.map(response -> toDomainObject(Models.class, response));
}
private <T> T toDomainObject(final Class<T> clazz, final WSResponse response) {
if (response.getStatus() == Http.Status.OK) {
return gson.fromJson(response.getBody(), clazz);
}
throw new RuntimeException("Http call failed, status:" + response.getStatus());
}
}
package controllers;
import play.libs.F;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import java.util.Optional;
/**
* Created by mati on 26/07/2014.
*/
public class RxPlay<T> {
private final Observable<T> os;
public RxPlay(final Observable<T> os) {
this.os = os;
}
public static <E> F.Promise<E> toPromise(final Observable<E> obs) {
return new RxPlay(obs).adopt();
}
public F.Promise<T> adopt() {
final RxPlayObserver rxPlayObserver = new RxPlayObserver();
final Subscription subscription = os.subscribe(rxPlayObserver);
return rxPlayObserver.toFPromise().map(val -> {
subscription.unsubscribe();
return val;
});
}
private class RxPlayObserver implements Observer<T> {
private final F.RedeemablePromise<T> rPromise = F.RedeemablePromise.empty();
private Optional<T> data = Optional.empty();
public F.Promise<T> toFPromise() {
return rPromise;
}
@Override
public void onNext(final T data) {
this.data = Optional.ofNullable(data);
}
@Override
public void onCompleted() {
data.ifPresent(d -> rPromise.success(d));
}
@Override
public void onError(final Throwable t) {
rPromise.failure(t);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment