HttpClient httpClient = HttpClient.createDefault();
CancellationToken cancellationToken = new CancellationToken();
// request_1
HttpRequest getRequest = new HttpRequest(HttpMethod.GET, "https://httpbin.org/get");
httpClient.send(getRequest, cancellationToken, new HttpCallback() {
@Override
public void onSuccess(HttpResponse response) { }
@Override
public void onError(Throwable error) { }
});
// request_2
HttpRequest postRequest = new HttpRequest(HttpMethod.POST, "https://httpbin.org/post", new byte[0]);
httpClient.send(postRequest, cancellationToken, new HttpCallback() {
@Override
public void onSuccess(HttpResponse response) { }
@Override
public void onError(Throwable error) { }
});
HttpClient httpClient = HttpClient.createDefault();
HttpPipeline httpPipeline = new HttpPipelineBuilder()
.policies(new AttachDatePolicy(), AttachAuthPolicy(authClient), new LogPolicy())
.httpClient(httpClient)
.build();
CancellationToken cancellationToken = new CancellationToken();
HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, "https://httpbin.org/get");
httpPipeline.send(httpRequest, Context.NONE, cancellationToken, new HttpCallback() {
@Override
public void onSuccess(HttpResponse response) { }
@Override
public void onError(Throwable error) { }
});
void HttpClient::send(HttpRequest httpRequest,
CancellationToken cancellationToken,
HttpCallback httpCallback);
void HttpPipeline::send(HttpRequest httpRequest,
Context context,
CancellationToken cancellationToken,
HttpCallback httpCallback)
void HttpPipeline::send(HttpRequest httpRequest,
Context context,
CancellationToken cancellationToken,
HttpCallback httpCallback)
final class MyPolicy implements HttpPipelinePolicy {
@Override
public void process(HttpPipelinePolicyChain chain) {
final HttpRequest request = chain.getRequest();
final Context context = chain.getContext();
final CancellationToken cancellationToken = chain.getCancellationToken();
.....
}
}
final class AttachDatePolicy implements HttpPipelinePolicy {
@Override
public void process(HttpPipelinePolicyChain chain) {
HttpRequest httpRequest = chain.getRequest();
httpRequest.getHeaders().put("Date", OffsetDateTime.now().toString());
chain.processNextPolicy(httpRequest);
}
}
- Intercept request
- Call into next-policy
- Intercept response
- [Policy completed] Return to previous-policy
Below is shown how these four steps look like in typical "OKHttp" and "Java SDK Reactor-based" policy implementation.
public okhttp3.Response intercept(okhttp3.Interceptor.Chain chain) {
// 1. Intercept request
okhttp3.Request request = chain.request();
try {
// 2. Call into next-policy
okhttp3.Response response = chain.proceed(request);
// 3. Intercept response
log(response.code());
// 4. [Policy completed] Return to previous-policy
return response;
} catch (Throwable error) {
log(error);
throw error;
}
}
response content interception
try {
// 2. Call into next-policy
okhttp3.Response response = chain.proceed(request);
// 3. Intercept response
okhttp3.Response bufferedResponse = buffer(response);
inspectContent(bufferedResponse);
// 4. [Policy completed] Return to previous-policy
return bufferedResponse;
} catch (Throwable error) {
log(error);
throw error;
}
api call during response interception
try {
// 2. Call into next-policy
okhttp3.Response response = chain.proceed(request);
// 3. Intercept response
okhttp3.Response fooResponse = client.apiFoo(response);
// 4. [Policy completed] Return to previous-policy
return fooResponse;
} catch (Throwable error) {
log(error);
throw error;
}
public Mono<Response> process(PiplelineCallContext context, NextPolicy next) {
// 1. Intercept request
HttpRequest request = context.getRequest();
// 2. Call into next-policy
Mono<HttpResponse> responseMono = next.process();
// 3. Intercept response
responseMono = responseMono
.doOnNext(response -> log(response.code()))
.doOnError(error -> log(error));
// 4. [Policy completed] Return to previous-policy
return responseMono;
}
response content interception
// 2. Call into next-policy
Mono<HttpResponse> responseMono = next.process();
// 3. Intercept response
responseMono = responseMono
.map(response -> {
HttpResponse bufferredResponse = buffer(response);
inspectContent(bufferredResponse);
return bufferredResponse;
})
.doOnError(error -> log(error));
// 4. [Policy completed] Return to previous-policy
return responseMono;
api call during response interception
// 2. Call into next-policy
Mono<HttpResponse> responseMono = next.process();
// 3. Intercept response
responseMono = responseMono
.flatmap(response -> {
Mono<HttpResponse> fooResponseMono = client.apiFoo(response);
return fooResponseMono;
})
.doOnError(error -> log(error));
// 4. [Policy completed] Return to previous-policy
return responseMono;
Now that we have seen examples for native OkHttp and Java SDK policy implementations, below shown how policy looks had we use CompletableFuture
.
Note that for Android, we want the policy to be async and not to depend on API24 CompletableFuture
or any external async framework. The below sample is just to get a feeling of structure of CF based policy.
public CompletetableFuture<HttpResponse> intercept(PipelnePolicyChain chain) {
// 1. Intercept request
HttpRequest request = chain.request();
// 2. Call into next-policy
CompletetableFuture<HttpResponse> responseCF = chain.proceed(request);
// 3. Intercept response
responseCF = responseCF.whenComplete((response, error) -> {
if (response != null) {
log(response.code()));
} else {
log(error);
}
});
// 4. [Policy compeleted] Return to previous-policy
return responseCF;
}
response content interception
// 2. Call into next-policy
CompletetableFuture<HttpResponse> responseCF = chain.proceed(request);
// 3. Intercept response
responseCF = responseCF.thenCompose((response) -> {
HttpResponse bufferredResponse = buffer(response);
inspectContent(bufferredResponse);
return CompletetableFuture.complete(bufferredResponse);
});
// 4. [Policy compeleted] Return to previous-policy
return responseCF;
api call during response interception
// 2. Call into next-policy
CompletetableFuture<HttpResponse> responseCF = chain.proceed(request);
// 3. Intercept response
responseCF = responseCF.thenComposeAsync((response) -> {
CompletableFuture<HttpResponse> fooResponseCF = client.apiFoo(response);
return fooResponseCF;
});
// 4. [Policy compeleted] Return to previous-policy
return responseCF;
This section covers the thought process behind the structure of policy in the Azure Android SDK core.
public void process(PipelnePolicyChain chain) {
// 1. Intercept request
HttpRequest request = chain.request();
// 2. Call into next-policy
chain.proceed(request, new HttpCallback() {
@Override
public void onSuccess(HttpResponse response) {
// 3. Intercept response
log(response.code());
}
@Override
public void onError(Throwable error) {
// 3. Intercept error response
log(error);
}
};
}
The above sample is async and callback based. For callback, it uses HttpCallback
type, the same type HttpClient
and HttpPipeline
uses, but the design is not addressing how to achieve the 4th step - "[Policy completed] Return to previous-policy".
[Policy completed] Return to previous-policy
❌
A straight forward approach is to define return type for onSuccess
and onError
:--- Tuple<HttpResponse, Throwable>
// 2. Call into next-policy
chain.processNextPolicy(request, new NextPolicyCallback() {
@Override
public Tuple<HttpResponse, Throwable> onSuccess(HttpResponse response) {
// 3. Intercept response
// 4. [Policy completed] Return to previous-policy
if (isGood(response)) {
return new Tuple<>(response, null);
} else {
return new Tuple<>(null, new Throwable("boom!"));
}
}
@Override
public Tuple<HttpResponse, Throwable> onError(Throwable error) {
// 3. Intercept error response
// 4. [Policy completed] Return to previous-policy
if (shouldUseCache) {
HttpResponse cachedResponse = cacheStore.get(..);
return new Tuple<>(cachedResponse, null);
} else {
return new Tuple<>(null, error);
}
}
});
But the design has the following drawbacks;
- Tuple not great dev experience.
- Enforces
onSucess | onError
to be synchronous.
Limitation of synchronous onSucess | onError
The following code shows the limitation we hit if we force onSuccess
and onError
to be synchronous, i.e., we can't make an async call from these methods.
// 2. Call into next-policy
chain.processNextPolicy(request, new NextPolicyCallback() {
@Override
public Tuple<HttpResponse, Throwable> onSuccess(HttpResponse response) {
// async-call
client.apiFoo(response, new FooCallback() {
void onFooComplete(BarResponse barResponse) {
HttpResponse coreResponse = toCoreResponse(barResponse);
}
void onFooFailed(Throwable t) {
}
});
// not completed yet!, so can't build Tuple and return.
}
@Override
public Tuple<HttpResponse, Throwable> onError(Throwable error) {
return new Tuple<>(null, error);
}
}
So, ideally, the design for the 4th step - "[Policy completed] Return to previous-policy" should cover both sync and async work inside onSuccess
| onError
.
[Policy completed] Return to previous-policy {sync-completion}
chain.processNextPolicy(request, new NextPolicyCallback() {
@Override
public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
if (isGood(response)) {
return completer.completed(updatedResponse);
} else {
return completer.completedError(new Throwable("boom!"));
}
}
@Override
public CompletionState onError(Throwable error, PolicyCompleter completer) {
if (shouldUseCache) {
Response cachedResponse = cacheStore.get(..);
return completer.completed(cachedResponse);
} else {
return completer.completedError(error);
}
}
}
[Policy completed] Return to previous-policy {async-completion}
chain.processNextPolicy(request, new NextPolicyCallback() {
@Override
public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
client.apiFoo(response, new FooCallback() {
void onFooComplete(BarResponse barResponse) {
HttpResponse coreResponse = toCoreResponse(barResponse);
completer.complete(coreResponse);
}
void onFooFailed(Throwable t) {
completer.completeError(t);
}
});
return completer.defer(); // completion is deferred
}
@Override
public CompletionState onError(Throwable error, PolicyCompleter completer) {
if (shouldUseCache) {
Response cachedResponse = cacheStore.get(..);
return completer.completed(cachedResponse);
} else {
return completer.completedError(error);
}
}
}
class RetryOncePolicy implements HttpPipelinePolicy {
@Override
public void process(HttpPipelinePolicyChain chain) {
final HttpRequest httpRequest = CopyWithMD5(chain.getRequest());
chain.processNextPolicy(httpRequest, new HttpCallback() {
@Override
public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
if (response.getStatusCode() == 500) {
return retryOnceAfter5Sec(chain);
} else {
return completer.complete(response);
}
}
@Override
public CompletionState onError(Throwable error, PolicyCompleter completer) {
return completer.completedError(error);
}
});
}
// Do retry once asynchronously.
private CompletionState retryOnceAfter5Sec(HttpPipelinePolicyChain chain) {
final HttpRequest httpRequest = CopyWithMD5(chain.getRequest());
chain.processNextPolicy(httpRequest, new HttpCallback() {
@Override
public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
return completer.completed(response);
}
@Override
public CompletionState onError(Throwable error, PolicyCompleter completer) {
return completer.completedError(error);
}
},
5, // delay
TimeUnit.SECONDS);
return completer.defer();
}
// Create a copy of given request with MD5 header set.
private HttpRequest CopyWithMD5(HttpRequest httpRequest) {
return HttpRequest
.ciopy()
.getHeaders()
.put("Content-MD5", computeMD5(httpRequest));
}
}
public interface HttpClient {
HttpCallDispatcher getHttpCallDispatcher();
void send(HttpRequest httpRequest,
CancellationToken cancellationToken,
HttpCallback httpCallback);
static HttpClient createDefault() {
return HttpClientProviders.createInstance();
}
}
public final class HttpCallDispatcher {
public HttpCallDispatcher();
public HttpCallDispatcher(ExecutorService executorService);
public HttpCallDispatcher(ExecutorService executorService,
ScheduledExecutorService scheduledExecutorService);
public void enqueue(HttpCallFunction httpCallFunction,
HttpRequest httpRequest,
CancellationToken cancellationToken,
HttpCallback httpCallback);
}