Skip to content

Instantly share code, notes, and snippets.

@anuchandy
Created February 9, 2021 19:19
Show Gist options
  • Save anuchandy/ce2319492824d548b5ed00a0529eb4ba to your computer and use it in GitHub Desktop.
Save anuchandy/ce2319492824d548b5ed00a0529eb4ba to your computer and use it in GitHub Desktop.

HttpClient usage:

HttpClient httpClient = HttpClient.createDefault();

CancellationToken cancellationToken = new CancellationToken();

// request_1
HttpRequest getRequest = new HttpRequest(HttpMethod.GET, "https://httpbin.org/get");

httpClient.send(getRequest, cancellationToken, new HttpCallback() {
    @Override
    public void onSuccess(HttpResponse response) { }

    @Override
    public void onError(Throwable error) { }
});

// request_2
HttpRequest postRequest = new HttpRequest(HttpMethod.POST, "https://httpbin.org/post", new byte[0]);

httpClient.send(postRequest, cancellationToken, new HttpCallback() {
    @Override
    public void onSuccess(HttpResponse response) { }

    @Override
    public void onError(Throwable error) { }
});

HttpPipeline usage:

HttpClient httpClient = HttpClient.createDefault();

HttpPipeline httpPipeline = new HttpPipelineBuilder()
    .policies(new AttachDatePolicy(), AttachAuthPolicy(authClient), new LogPolicy())
    .httpClient(httpClient)
    .build();

CancellationToken cancellationToken = new CancellationToken();
HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, "https://httpbin.org/get");

httpPipeline.send(httpRequest, Context.NONE, cancellationToken, new HttpCallback() {
    @Override
    public void onSuccess(HttpResponse response) { }

    @Override
    public void onError(Throwable error) { }
});

HttpClient and HttpPipeline signature:

void HttpClient::send(HttpRequest httpRequest,
                      CancellationToken cancellationToken,
                      HttpCallback httpCallback);


void HttpPipeline::send(HttpRequest httpRequest,
                        Context context,
                        CancellationToken cancellationToken,
                        HttpCallback httpCallback)

Access pipeline params in policy

void HttpPipeline::send(HttpRequest httpRequest,
                        Context context,
                        CancellationToken cancellationToken,
                        HttpCallback httpCallback)
final class MyPolicy implements HttpPipelinePolicy {

    @Override
    public void process(HttpPipelinePolicyChain chain) {
        final HttpRequest request = chain.getRequest();
        final Context context = chain.getContext();
        final CancellationToken cancellationToken = chain.getCancellationToken();

        .....
    }
}

Policy request-response interception:

intercept-only-request:

final class AttachDatePolicy implements HttpPipelinePolicy {
    @Override
    public void process(HttpPipelinePolicyChain chain) {
        HttpRequest httpRequest = chain.getRequest();
        httpRequest.getHeaders().put("Date", OffsetDateTime.now().toString());

        chain.processNextPolicy(httpRequest);
    }
}

intercept-[request]-response:

Policy Anatomy:

  1. Intercept request
  2. Call into next-policy
  3. Intercept response
  4. [Policy completed] Return to previous-policy

Below is shown how these four steps look like in typical "OKHttp" and "Java SDK Reactor-based" policy implementation.

Sync policy (Native OkHttp interceptor)

public okhttp3.Response intercept(okhttp3.Interceptor.Chain chain) {
    // 1. Intercept request
    okhttp3.Request request = chain.request();

    try {
        // 2. Call into next-policy
        okhttp3.Response response = chain.proceed(request);
        // 3. Intercept response
        log(response.code());
        // 4. [Policy completed] Return to previous-policy
        return response;
    } catch (Throwable error) {
        log(error);
        throw error;
    }
}
response content interception
try {
    // 2. Call into next-policy
    okhttp3.Response response = chain.proceed(request);
     // 3. Intercept response
    okhttp3.Response bufferedResponse = buffer(response);
    inspectContent(bufferedResponse);
     // 4. [Policy completed] Return to previous-policy
    return bufferedResponse;
} catch (Throwable error) {
    log(error);
    throw error;
}
api call during response interception
try {
    // 2. Call into next-policy
    okhttp3.Response response = chain.proceed(request);
     // 3. Intercept response
    okhttp3.Response fooResponse = client.apiFoo(response);
     // 4. [Policy completed] Return to previous-policy
    return fooResponse;
} catch (Throwable error) {
    log(error);
    throw error;
}

Async policy (Reactor: Azure Java SDK)

public Mono<Response> process(PiplelineCallContext context, NextPolicy next) {
    // 1. Intercept request
    HttpRequest request = context.getRequest();

    // 2. Call into next-policy
    Mono<HttpResponse> responseMono = next.process();
    // 3. Intercept response
    responseMono = responseMono
        .doOnNext(response -> log(response.code()))
        .doOnError(error -> log(error));
    // 4. [Policy completed] Return to previous-policy
    return responseMono;
}
response content interception
// 2. Call into next-policy
Mono<HttpResponse> responseMono = next.process();
// 3. Intercept response
responseMono = responseMono
    .map(response -> {
        HttpResponse bufferredResponse = buffer(response);
        inspectContent(bufferredResponse);
        return bufferredResponse;
    })
    .doOnError(error -> log(error));
// 4. [Policy completed] Return to previous-policy
return responseMono;
api call during response interception
// 2. Call into next-policy
Mono<HttpResponse> responseMono = next.process();
// 3. Intercept response
responseMono = responseMono
    .flatmap(response -> {
        Mono<HttpResponse> fooResponseMono = client.apiFoo(response);
        return fooResponseMono;
    })
    .doOnError(error -> log(error));
// 4. [Policy completed] Return to previous-policy
return responseMono;

Async policy (CompletableFuture)

Now that we have seen examples for native OkHttp and Java SDK policy implementations, below shown how policy looks had we use CompletableFuture.

Note that for Android, we want the policy to be async and not to depend on API24 CompletableFuture or any external async framework. The below sample is just to get a feeling of structure of CF based policy.

public CompletetableFuture<HttpResponse> intercept(PipelnePolicyChain chain) {
    // 1. Intercept request
    HttpRequest request = chain.request();

    // 2. Call into next-policy
    CompletetableFuture<HttpResponse> responseCF = chain.proceed(request);
    // 3. Intercept response
    responseCF = responseCF.whenComplete((response, error) -> {
        if (response != null) {
            log(response.code()));
        } else {
            log(error);
        }
    });
    // 4. [Policy compeleted] Return to previous-policy
    return responseCF;
}
response content interception
// 2. Call into next-policy
CompletetableFuture<HttpResponse> responseCF = chain.proceed(request);
// 3. Intercept response
responseCF = responseCF.thenCompose((response) -> {
    HttpResponse bufferredResponse = buffer(response);
    inspectContent(bufferredResponse);
    return CompletetableFuture.complete(bufferredResponse);
});
// 4. [Policy compeleted] Return to previous-policy
return responseCF;
api call during response interception
// 2. Call into next-policy
CompletetableFuture<HttpResponse> responseCF = chain.proceed(request);
// 3. Intercept response
responseCF = responseCF.thenComposeAsync((response) -> {
    CompletableFuture<HttpResponse> fooResponseCF = client.apiFoo(response);
    return fooResponseCF;
});
// 4. [Policy compeleted] Return to previous-policy
return responseCF;

Async policy (Callback: Azure Android SDK)

This section covers the thought process behind the structure of policy in the Azure Android SDK core.

public void process(PipelnePolicyChain chain) {
    // 1. Intercept request
    HttpRequest request = chain.request();

    // 2. Call into next-policy
    chain.proceed(request, new HttpCallback() {
        @Override
        public void onSuccess(HttpResponse response) {
            // 3. Intercept response
            log(response.code());
        }

        @Override
        public void onError(Throwable error) {
            // 3. Intercept error response
            log(error);
        }
    };
}

The above sample is async and callback based. For callback, it uses HttpCallback type, the same type HttpClient and HttpPipeline uses, but the design is not addressing how to achieve the 4th step - "[Policy completed] Return to previous-policy".

[Policy completed] Return to previous-policy

A straight forward approach is to define return type for onSuccess and onError :--- Tuple<HttpResponse, Throwable>

// 2. Call into next-policy
chain.processNextPolicy(request, new NextPolicyCallback() {

    @Override
    public Tuple<HttpResponse, Throwable> onSuccess(HttpResponse response) {
        // 3. Intercept response

        // 4.  [Policy completed] Return to previous-policy
        if (isGood(response)) {
            return new Tuple<>(response, null);
        } else {
            return new Tuple<>(null, new Throwable("boom!"));
        }
    }

    @Override
    public Tuple<HttpResponse, Throwable> onError(Throwable error) {
        // 3. Intercept error response

        // 4.  [Policy completed] Return to previous-policy
        if (shouldUseCache) {
            HttpResponse cachedResponse = cacheStore.get(..);
            return new Tuple<>(cachedResponse, null);
        } else {
            return new Tuple<>(null, error);
        }
    }
});

But the design has the following drawbacks;

  1. Tuple not great dev experience.
  2. Enforces onSucess | onError to be synchronous.
Limitation of synchronous onSucess | onError

The following code shows the limitation we hit if we force onSuccess and onError to be synchronous, i.e., we can't make an async call from these methods.

// 2. Call into next-policy
chain.processNextPolicy(request, new NextPolicyCallback() {

    @Override
    public Tuple<HttpResponse, Throwable> onSuccess(HttpResponse response) {
        // async-call
        client.apiFoo(response, new FooCallback() {
            void onFooComplete(BarResponse barResponse) {
                HttpResponse coreResponse = toCoreResponse(barResponse);
            }

            void onFooFailed(Throwable t) {

            }
        });
        // not completed yet!, so can't build Tuple and return.
    }

    @Override
    public Tuple<HttpResponse, Throwable> onError(Throwable error) { 
        return new Tuple<>(null, error); 
    }
}


PolicyCompleter

So, ideally, the design for the 4th step - "[Policy completed] Return to previous-policy" should cover both sync and async work inside onSuccess | onError.


[Policy completed] Return to previous-policy {sync-completion}
chain.processNextPolicy(request, new NextPolicyCallback() {

    @Override
    public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
        if (isGood(response)) {
            return completer.completed(updatedResponse);
        } else {
            return completer.completedError(new Throwable("boom!"));
        }
    }

    @Override
    public CompletionState onError(Throwable error, PolicyCompleter completer) {
        if (shouldUseCache) {
            Response cachedResponse = cacheStore.get(..);
            return completer.completed(cachedResponse);
        } else {
            return completer.completedError(error);
        }
    }
}
[Policy completed] Return to previous-policy {async-completion}
chain.processNextPolicy(request, new NextPolicyCallback() {

    @Override
    public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {

        client.apiFoo(response, new FooCallback() {
            void onFooComplete(BarResponse barResponse) {
                HttpResponse coreResponse = toCoreResponse(barResponse);
                completer.complete(coreResponse);
            }

            void onFooFailed(Throwable t) {
                completer.completeError(t);
            }
        });
        
        return completer.defer(); // completion is deferred
    }

    @Override
    public CompletionState onError(Throwable error, PolicyCompleter completer) {
        if (shouldUseCache) {
            Response cachedResponse = cacheStore.get(..);
            return completer.completed(cachedResponse);
        } else {
            return completer.completedError(error);
        }
    }
}

Out of the box support for retry

class RetryOncePolicy implements HttpPipelinePolicy {
    @Override
    public void process(HttpPipelinePolicyChain chain) {
        final HttpRequest httpRequest = CopyWithMD5(chain.getRequest());

        chain.processNextPolicy(httpRequest, new HttpCallback() {
            @Override
            public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
                if (response.getStatusCode() == 500) {
                    return retryOnceAfter5Sec(chain);
                } else {
                    return completer.complete(response);
                }
            }

            @Override
            public CompletionState onError(Throwable error, PolicyCompleter completer) {
                return completer.completedError(error);
            }
        });
    }

    // Do retry once asynchronously.
    private CompletionState retryOnceAfter5Sec(HttpPipelinePolicyChain chain) {
        final HttpRequest httpRequest = CopyWithMD5(chain.getRequest());

        chain.processNextPolicy(httpRequest, new HttpCallback() {
            @Override
            public CompletionState onSuccess(HttpResponse response, PolicyCompleter completer) {
                return completer.completed(response);
            }

            @Override
            public CompletionState onError(Throwable error, PolicyCompleter completer) {
                return completer.completedError(error);
            }
        }, 
        5,  // delay
        TimeUnit.SECONDS);

        return completer.defer();
    }

    // Create a copy of given request with MD5 header set.
    private HttpRequest CopyWithMD5(HttpRequest httpRequest) {
        return HttpRequest
          .ciopy()
          .getHeaders()
          .put("Content-MD5", computeMD5(httpRequest));
    }
}

HttpCallDispatcher:

One HttpClient and multiple HttpPipelines:

PipelineShareHttpClient

Async HttpPipeline run:

PipelineHttpDispatcher

public interface HttpClient {

    HttpCallDispatcher getHttpCallDispatcher();

    void send(HttpRequest httpRequest,
                      CancellationToken cancellationToken,
                      HttpCallback httpCallback);

    static HttpClient createDefault() {
        return HttpClientProviders.createInstance();
    }
}
public final class HttpCallDispatcher {
    public HttpCallDispatcher();
    public HttpCallDispatcher(ExecutorService executorService);
    public HttpCallDispatcher(ExecutorService executorService,
                              ScheduledExecutorService scheduledExecutorService);

    
    public void enqueue(HttpCallFunction httpCallFunction,
                        HttpRequest httpRequest,
                        CancellationToken cancellationToken,
                        HttpCallback httpCallback);
}

HttpClient(OkHttpClient) and HttpPipeline

OkHttp_Dispatcher_ShareES

HttpClient(HttpUrlConnection) and HttpPipeline

HttpUrlCon_Use_Dispatcher

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment