Skip to content

Instantly share code, notes, and snippets.

@ericcj
Created February 28, 2017 16:57
Show Gist options
  • Save ericcj/49118c877f471b3d8d53db545160e4ac to your computer and use it in GitHub Desktop.
Save ericcj/49118c877f471b3d8d53db545160e4ac to your computer and use it in GitHub Desktop.
An okhttp3.Call with a deadline timeout from the start of isExecuted until ResponseBody.source() is closed or unused.
package com.pushd.util;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.internal.http2.StreamResetException;
import okio.Buffer;
import okio.BufferedSource;
import okio.ForwardingSource;
import okio.Okio;
/**
* An okhttp3.Call with a deadline timeout from the start of isExecuted until ResponseBody.source() is closed or unused.
*/
public class DeadlineCall implements Call {
private final static Logger LOGGER = Logger.getLogger(DeadlineCall.class.getName());
private static AtomicInteger sFutures = new AtomicInteger();
private static final ScheduledExecutorService sHTTPCancelExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "DeadlineCallCancel");
t.setDaemon(true);
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
});
private final Call mUnderlying;
private final int mDeadlineTimeout;
private volatile ScheduledFuture mDeadline;
private volatile boolean mDeadlineHit;
private volatile boolean mCancelled;
private volatile BufferedSource mBodySource;
DeadlineCall(Call underlying, int deadlineTimeout) {
mUnderlying = underlying;
mDeadlineTimeout = deadlineTimeout;
}
/**
* Factory wrapper for OkHttpClient.newCall(request) to create a new DeadlineCall scheduled to cancel its underlying Call after the deadline.
* @param client
* @param request
* @param deadlineTimeout in ms
* @return Call
*/
public static DeadlineCall newDeadlineCall(@NonNull OkHttpClient client, @NonNull Request request, int deadlineTimeout) {
final Call underlying = client.newCall(request);
return new DeadlineCall(underlying, deadlineTimeout);
}
/**
* Shuts down thread that cancels calls when their deadline is hit.
*/
public static void shutdownNow() {
sHTTPCancelExecutorService.shutdownNow();
}
@Override
public Request request() {
return mUnderlying.request();
}
/**
* Response MUST be closed to clean up deadline even if body is not read, e.g. on !isSuccessful
* @return
* @throws IOException
*/
@Override
public Response execute() throws IOException {
startDeadline();
try {
return wrapResponse(mUnderlying.execute());
} catch (IOException e) {
cancelDeadline();
throw wrapIfDeadline(e);
}
}
/**
* Deadline is removed when onResponse returns unless response.body().source() or a method using
* it is called synchronously from onResponse to indicate caller's committment to close it themselves.
* This includes peekBody so prefer DeadlineResponseBody.peek unless you explicitly close after peekBody.
* @param responseCallback
*/
@Override
public void enqueue(final Callback responseCallback) {
startDeadline();
mUnderlying.enqueue(new Callback() {
@Override
public void onFailure(Call underlying, IOException e) {
cancelDeadline(); // there is no body to read so no need for deadline anymore
responseCallback.onFailure(DeadlineCall.this, wrapIfDeadline(e));
}
@Override
public void onResponse(Call underlying, Response response) throws IOException {
try {
responseCallback.onResponse(DeadlineCall.this, wrapResponse(response));
if (mBodySource == null) {
cancelDeadline(); // remove deadline if body was never opened
}
} catch (IOException e) {
cancelDeadline();
throw wrapIfDeadline(e);
}
}
});
}
private IOException wrapIfDeadline(IOException e) {
if (mDeadlineHit && isCancellationException(e)) {
return new DeadlineException(e);
}
return e;
}
public class DeadlineException extends IOException {
public DeadlineException(Throwable cause) {
super(cause);
}
}
/**
* Wraps response to cancelDeadline when response closed and throw correct DeadlineException when deadline happens during response reading.
* @param response
* @return
*/
private Response wrapResponse(final Response response) {
return response.newBuilder().body(new DeadlineResponseBody(response)).build();
}
public class DeadlineResponseBody extends ResponseBody {
private final Response mResponse;
DeadlineResponseBody(final Response response) {
mResponse = response;
}
@Override
public MediaType contentType() {
return mResponse.body().contentType();
}
@Override
public long contentLength() {
return mResponse.body().contentLength();
}
/**
* @return the body source indicating it will be closed later by the caller to cancel the deadline
*/
@Override
public BufferedSource source() {
if (mBodySource == null) {
mBodySource = Okio.buffer(new ForwardingSource(mResponse.body().source()) {
@Override
public long read(Buffer sink, long byteCount) throws IOException {
try {
return super.read(sink, byteCount);
} catch (IOException e) {
throw wrapIfDeadline(e);
}
}
@Override
public void close() throws IOException {
cancelDeadline();
super.close();
}
});
}
return mBodySource;
}
/**
* @return the body source without indicating it will be closed later by caller, e.g. to peekBody on unsucessful requests
*/
public BufferedSource peekSource() {
return mResponse.body().source();
}
/**
* Copy of https://square.github.io/okhttp/3.x/okhttp/okhttp3/Response.html#peekBody-long- that uses peekSource() since Response class is final
* @param byteCount
* @return
* @throws IOException
*/
public ResponseBody peek(long byteCount) throws IOException {
BufferedSource source = peekSource();
source.request(byteCount);
Buffer copy = source.buffer().clone();
// There may be more than byteCount bytes in source.buffer(). If there is, return a prefix.
Buffer result;
if (copy.size() > byteCount) {
result = new Buffer();
result.write(copy, byteCount);
copy.clear();
} else {
result = copy;
}
return ResponseBody.create(mResponse.body().contentType(), result.size(), result);
}
}
private void startDeadline() {
mDeadline = sHTTPCancelExecutorService.schedule(new Runnable() {
@Override
public void run() {
mDeadlineHit = true;
mUnderlying.cancel(); // calls onFailure or causes body read to throw
LOGGER.fine("Deadline hit for " + request()); // should trigger a subsequent wrapIfDeadline but if we see this log line without that it means the caller orphaned us without closing
}
}, mDeadlineTimeout, TimeUnit.MILLISECONDS);
LOGGER.fine("started deadline for " + request());
if (sFutures.incrementAndGet() == 1000) {
LOGGER.warning("1000 pending DeadlineCalls, may be leaking due to not calling close()");
}
}
private void cancelDeadline() {
if (mDeadline != null) {
mDeadline.cancel(false);
mDeadline = null;
sFutures.decrementAndGet();
LOGGER.fine("canceled deadline for " + request());
} else {
LOGGER.info("deadline already canceled for " + request());
}
}
@Override
public void cancel() {
mCancelled = true;
// should trigger onFailure or raise from execute or responseCallback.onResponse which will cancelDeadline
mUnderlying.cancel();
}
@Override
public boolean isExecuted() {
return mUnderlying.isExecuted();
}
@Override
public boolean isCanceled() {
return mCancelled;
}
@Override
public Call clone() {
return new DeadlineCall(mUnderlying.clone(), mDeadlineTimeout);
}
private static boolean isCancellationException(IOException e) {
// okhttp cancel from HTTP/2 calls
if (e instanceof StreamResetException) {
switch (((StreamResetException) e).errorCode) {
case CANCEL:
return true;
}
}
// https://android.googlesource.com/platform/external/okhttp/+/master/okhttp/src/main/java/com/squareup/okhttp/Call.java#281
if (e instanceof IOException &&
e.getMessage() != null && e.getMessage().equals("Canceled")) {
return true;
}
return false;
}
}
@NoHarmDan
Copy link

Hi, how do you use this with Retrofit2? (I mean, if you do, but I suppose you do, since a feature request thread on Retrofit lead me here) I can't exactly use this instead of retrofit2.Call and I can't replace it's inner okhttp3.Call instance, as I would have to basically hack the whole Retrofit library. Or am I completely missing something?

(Of course it could be altered to work with the retrofit2.Call... almost - I don't know of a construct similar to response.newBuilder().body(new DeadlineResponseBody(response)).build(); in retrofit2.Call. Or any other way to replace its okhttp3.ResponseBody...)

Thanks for any input - I'm desperately looking for a reliable way to put a universal timeout on a retrofit2.Call and this is the best thing I've seen so far.

@ericcj
Copy link
Author

ericcj commented Aug 12, 2017

@NoHarmDan we use okhttp directly so I'm not familiar with retrofit, but it sounds like you understand the nuts and bolts enough to create a pull request on retrofit that would support integrating DeadlineCall. could you link to the retrofit feature request from square/okhttp#2840 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment