Skip to content

Instantly share code, notes, and snippets.

@prabirshrestha
Last active October 10, 2018 16:06
Show Gist options
  • Save prabirshrestha/6324055 to your computer and use it in GitHub Desktop.
Save prabirshrestha/6324055 to your computer and use it in GitHub Desktop.
OkHttpClient+RxJava
package com.example.rx_okhttp;
import com.squareup.okhttp.OkHttpClient;
import org.apache.http.*;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicHttpResponse;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.BooleanSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func1;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.CancellationException;
import static org.apache.http.HttpVersion.HTTP_1_1;
public class HttpClient {
private static final int DEFAULT_BUFFER_SIZE = 1024;
private final OkHttpClient httpClient;
private final Scheduler httpScheduler;
public OkHttpClient getOkHttpClient() {
return httpClient;
}
public HttpClient(OkHttpClient httpClient, Scheduler httpScheduler) {
this.httpClient = httpClient;
this.httpScheduler = httpScheduler;
}
public HttpClient(OkHttpClient httpClient) {
this(httpClient, Schedulers.threadPoolForIO());
}
public HttpClient() {
this(new OkHttpClient(), Schedulers.threadPoolForIO());
}
public Observable<HttpResponse> execute(final HttpRequest request) {
return execute(request, this.httpScheduler);
}
public Observable<HttpResponse> execute(final HttpRequest request, final Scheduler httpScheduler) {
return Observable.create(new Func1<Observer<HttpResponse>, Subscription>() {
@Override
public Subscription call(final Observer<HttpResponse> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
try {
RequestLine requestLine = request.getRequestLine();
URL url = new URL(requestLine.getUri());
final HttpURLConnection connection = httpClient.open(url);
connection.setRequestMethod(requestLine.getMethod());
for (Header header : request.getAllHeaders()) {
connection.addRequestProperty(header.getName(), header.getValue());
}
httpScheduler.schedule(new Action0() {
@Override
public void call() {
try {
if (subscription.isUnsubscribed()) {
throw new CancellationException("Request Cancelled");
}
// Stream the request body.
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
if (entity != null) {
connection.setDoOutput(true);
Header type = entity.getContentType();
if (type != null) {
connection.addRequestProperty(type.getName(), type.getValue());
}
Header encoding = entity.getContentEncoding();
if (encoding != null) {
connection.addRequestProperty(encoding.getName(), encoding.getValue());
}
if (entity.isChunked() || entity.getContentLength() < 0) {
connection.setChunkedStreamingMode(0);
} else if (entity.getContentLength() <= 8192) {
// Buffer short, fixed-length request bodies. This costs memory, but permits the request
// to be transparently retried if there is a connection failure.
connection.addRequestProperty("Content-Length", Long.toString(entity.getContentLength()));
} else {
connection.setFixedLengthStreamingMode((int) entity.getContentLength());
}
OutputStream output = connection.getOutputStream();
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int n;
while (-1 != (n = entity.getContent().read(buffer))) {
if(subscription.isUnsubscribed()) {
throw new CancellationException("Request Cancelled");
}
output.write(buffer, 0, n);
}
}
}
if (subscription.isUnsubscribed()) {
throw new CancellationException("Request Cancelled");
}
// Read the response headers.
int responseCode = connection.getResponseCode();
String message = connection.getResponseMessage();
BasicHttpResponse response = new BasicHttpResponse(HTTP_1_1, responseCode, message);
// Get the response body ready to stream.
InputStream responseBody =
responseCode < HttpURLConnection.HTTP_BAD_REQUEST
? connection.getInputStream()
: connection.getErrorStream();
InputStreamEntity entity = new InputStreamEntity(responseBody, connection.getContentLength());
for (int i = 0; true; i++) {
String name = connection.getHeaderFieldKey(i);
if (name == null) {
break;
}
BasicHeader header = new BasicHeader(name, connection.getHeaderField(i));
response.addHeader(header);
if (name.equalsIgnoreCase("Content-Type")) {
entity.setContentType(header);
} else if (name.equalsIgnoreCase("Content-Encoding")) {
entity.setContentEncoding(header);
}
}
response.setEntity(entity);
observer.onNext(response);
}
catch (Exception e) {
observer.onError(e);
}
}
});
} catch (Exception e) {
observer.onError(e);
}
return subscription;
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment