Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save eneim/fb9a9fe44e2d59b0fef109419bb6770f to your computer and use it in GitHub Desktop.
Save eneim/fb9a9fe44e2d59b0fef109419bb6770f to your computer and use it in GitHub Desktop.
OkHttp Observable Callback
public Observable<FilesWrapper> download(List<Thing> things) {
return Observable.from(things)
.flatMap(thing -> {
File file = new File(getExternalCacheDir() + File.separator + thing.getName());
if (file.exists()) {
return Observable.just(file);
}
Request request = new Request.Builder().url(thing.getUrl()).build();
final ObservableCallback callback = new ObservableCallback();
client.newCall(request).enqueue(callback);
return callback.getObservable().map(response -> {
try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {
final ResponseBody body = response.body();
sink.writeAll(body.source());
} catch (IOException io) {
throw OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(io, thing));
}
return file;
});
})
.toList()
.map(files -> new FilesWrapper(files));
}
private static class ObservableCallback implements Callback {
private final AsyncSubject<Response> subject = AsyncSubject.create();
@Override
public void onFailure(Request request, IOException e) {
subject.onError(OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(e, request)));
}
@Override
public void onResponse(Response response) throws IOException {
subject.onNext(response);
subject.onCompleted();
}
public Observable<Response> getObservable() {
return subject.asObservable();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment