Skip to content

Instantly share code, notes, and snippets.

@g2vinay
Last active June 7, 2022 20:16
Show Gist options
  • Save g2vinay/f88239caec464bdc0080f6df06b81421 to your computer and use it in GitHub Desktop.
Save g2vinay/f88239caec464bdc0080f6df06b81421 to your computer and use it in GitHub Desktop.
Sync Stack Java

Sync Stack

image

Sync Over Async Flow.

image

Sync Stack Flow

image

Sync Rest Proxy

image

image

image

image

Sync Rest Proxy

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.http.rest;

import com.azure.core.exception.HttpResponseException;
import com.azure.core.exception.UnexpectedLengthException;
import com.azure.core.http.*;
import com.azure.core.http.policy.CookiePolicy;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.policy.RetryPolicy;
import com.azure.core.http.policy.UserAgentPolicy;
import com.azure.core.implementation.AccessibleByteArrayOutputStream;
import com.azure.core.implementation.TypeUtil;
import com.azure.core.implementation.http.UnexpectedExceptionInformation;
import com.azure.core.implementation.serializer.HttpResponseDecoder;
import com.azure.core.implementation.serializer.HttpResponseDecoder.HttpDecodedResponse;
import com.azure.core.implementation.util.*;
import com.azure.core.util.*;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JacksonAdapter;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.serializer.SerializerEncoding;
import com.azure.core.util.tracing.Tracer;
import com.azure.core.util.tracing.TracerProxy;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.util.context.ContextView;

import java.io.*;
import java.lang.invoke.MethodHandle;
import java.lang.reflect.*;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

import static com.azure.core.implementation.serializer.HttpResponseBodyDecoder.shouldEagerlyReadResponse;

/**
 * Type to create a proxy implementation for an interface describing REST API methods.
 *
 * RestProxy can create proxy implementations for interfaces with methods that return deserialized Java objects as well
 * as asynchronous Single objects that resolve to a deserialized Java object.
 */
public final class SyncRestProxy implements InvocationHandler {
    private static final ByteBuffer VALIDATION_BUFFER = ByteBuffer.allocate(0);
    private static final String BODY_TOO_LARGE = "Request body emitted %d bytes, more than the expected %d bytes.";
    private static final String BODY_TOO_SMALL = "Request body emitted %d bytes, less than the expected %d bytes.";
    private static final String MUST_IMPLEMENT_PAGE_ERROR =
        "Unable to create PagedResponse<T>. Body must be of a type that implements: " + Page.class;

    private static final ResponseConstructorsCache RESPONSE_CONSTRUCTORS_CACHE = new ResponseConstructorsCache();

    // RestProxy is a commonly used class, use a static logger.
    private static final ClientLogger LOGGER = new ClientLogger(SyncRestProxy.class);

    private final HttpPipeline httpPipeline;
    private final SerializerAdapter serializer;
    private final SwaggerInterfaceParser interfaceParser;
    private final HttpResponseDecoder decoder;

    /**
     * Create a RestProxy.
     *
     * @param httpPipeline the HttpPipelinePolicy and HttpClient httpPipeline that will be used to send HTTP requests.
     * @param serializer the serializer that will be used to convert response bodies to POJOs.
     * @param interfaceParser the parser that contains information about the interface describing REST API methods that
     * this RestProxy "implements".
     */
    private SyncRestProxy(HttpPipeline httpPipeline, SerializerAdapter serializer, SwaggerInterfaceParser interfaceParser) {
        this.httpPipeline = httpPipeline;
        this.serializer = serializer;
        this.interfaceParser = interfaceParser;
        this.decoder = new HttpResponseDecoder(this.serializer);
    }

Send API

    /**
     * Send the provided request asynchronously, applying any request policies provided to the HttpClient instance.
     *
     * @param request the HTTP request to send
     * @param contextData the context
     * @return a {@link Mono} that emits HttpResponse asynchronously
     */
    public HttpResponse send(HttpRequest request, Context contextData) {
        return httpPipeline.sendSynchronously(request, contextData);
    }

Invoke API

  @Override
    public Object invoke(Object proxy, final Method method, Object[] args) {
        validateResumeOperationIsNotPresent(method);

        final SwaggerMethodParser methodParser = getMethodParser(method);
        HttpRequest request;
        try {
            request = createHttpRequest(methodParser, args);
        } catch (IOException e) {
            throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
        }

        Context context = methodParser.setContext(args);

        RequestOptions options = methodParser.setRequestOptions(args);
        context = mergeRequestOptionsContext(context, options);

        context = context.addData("caller-method", methodParser.getFullyQualifiedMethodName())
            .addData("azure-eagerly-read-response", shouldEagerlyReadResponse(methodParser.getReturnType()));

        HttpDecodedResponse decodedResponse = null;
        Throwable throwable = null;
        try {
            context = startTracingSpan(method, context);

            // If there is 'RequestOptions' apply its request callback operations before validating the body.
            // This is because the callbacks may mutate the request body.
            if (options != null) {
                options.getRequestCallback().accept(request);
            }

            if (request.getBody() != null) {
                request.setContent(validateLengthSync(request));
            }

            final HttpResponse response = send(request, context);

            decodedResponse = this.decoder.decodeSync(response, methodParser);

            return handleRestReturnType(decodedResponse, methodParser,
                methodParser.getReturnType(), context, options);
        } catch (Exception e) {
            throwable = e;
            throw new RuntimeException(e);
        } finally {
            if (decodedResponse != null || throwable != null) {
                endTracingSpan(decodedResponse, throwable, context);
            }
        }
    }

Request Setup

    /**
     * Create a HttpRequest for the provided Swagger method using the provided arguments.
     *
     * @param methodParser the Swagger method parser to use
     * @param args the arguments to use to populate the method's annotation values
     * @return a HttpRequest
     * @throws IOException thrown if the body contents cannot be serialized
     */
    private HttpRequest createHttpRequest(SwaggerMethodParser methodParser, Object[] args) throws IOException {
        // Sometimes people pass in a full URL for the value of their PathParam annotated argument.
        // This definitely happens in paging scenarios. In that case, just use the full URL and
        // ignore the Host annotation.
        final String path = methodParser.setPath(args);
        final UrlBuilder pathUrlBuilder = UrlBuilder.parse(path);

        final UrlBuilder urlBuilder;
        if (pathUrlBuilder.getScheme() != null) {
            urlBuilder = pathUrlBuilder;
        } else {
            urlBuilder = new UrlBuilder();

            methodParser.setSchemeAndHost(args, urlBuilder);

            // Set the path after host, concatenating the path
            // segment in the host.
            if (path != null && !path.isEmpty() && !"/".equals(path)) {
                String hostPath = urlBuilder.getPath();
                if (hostPath == null || hostPath.isEmpty() || "/".equals(hostPath) || path.contains("://")) {
                    urlBuilder.setPath(path);
                } else {
                    if (path.startsWith("/")) {
                        urlBuilder.setPath(hostPath + path);
                    } else {
                        urlBuilder.setPath(hostPath + "/" + path);
                    }
                }
            }
        }

        methodParser.setEncodedQueryParameters(args, urlBuilder);

        final URL url = urlBuilder.toUrl();
        final HttpRequest request = configRequest(new HttpRequest(methodParser.getHttpMethod(), url),
            methodParser, args);

        // Headers from Swagger method arguments always take precedence over inferred headers from body types
        HttpHeaders httpHeaders = request.getHeaders();
        methodParser.setHeaders(args, httpHeaders);

        return request;
    }

    @SuppressWarnings("unchecked")
    private HttpRequest configRequest(final HttpRequest request, final SwaggerMethodParser methodParser,
        final Object[] args) throws IOException {
        final Object bodyContentObject = methodParser.setBody(args);
        if (bodyContentObject == null) {
            request.getHeaders().set("Content-Length", "0");
        } else {
            // We read the content type from the @BodyParam annotation
            String contentType = methodParser.getBodyContentType();

            // If this is null or empty, the service interface definition is incomplete and should
            // be fixed to ensure correct definitions are applied
            if (contentType == null || contentType.isEmpty()) {
                if (bodyContentObject instanceof byte[] || bodyContentObject instanceof String) {
                    contentType = ContentType.APPLICATION_OCTET_STREAM;
                } else {
                    contentType = ContentType.APPLICATION_JSON;
                }
            }

            request.getHeaders().set("Content-Type", contentType);
            if (bodyContentObject instanceof BinaryData) {
                BinaryData binaryData = (BinaryData) bodyContentObject;
                if (binaryData.getLength() != null) {
                    request.setHeader("Content-Length", binaryData.getLength().toString());
                }
                // The request body is not read here. The call to `toFluxByteBuffer()` lazily converts the underlying
                // content of BinaryData to a Flux<ByteBuffer> which is then read by HttpClient implementations when
                // sending the request to the service. There is no memory copy that happens here. Sources like
                // InputStream, File and Flux<ByteBuffer> will not be eagerly copied into memory until it's required
                // by the HttpClient implementations.
                request.setContent(binaryData);
                return request;
            }

            // TODO(jogiles) this feels hacky
            boolean isJson = false;
            final String[] contentTypeParts = contentType.split(";");
            for (final String contentTypePart : contentTypeParts) {
                if (contentTypePart.trim().equalsIgnoreCase(ContentType.APPLICATION_JSON)) {
                    isJson = true;
                    break;
                }
            }

            if (isJson) {
                ByteArrayOutputStream stream = new AccessibleByteArrayOutputStream();
                serializer.serialize(bodyContentObject, SerializerEncoding.JSON, stream);

                request.setHeader("Content-Length", String.valueOf(stream.size()));
                request.setContent(BinaryData.fromStream(new ByteArrayInputStream(stream.toByteArray(), 0, stream.size())));

            } else if (bodyContentObject instanceof byte[]) {
                request.setBody((byte[]) bodyContentObject);
            } else if (bodyContentObject instanceof String) {
                final String bodyContentString = (String) bodyContentObject;
                if (!bodyContentString.isEmpty()) {
                    request.setBody(bodyContentString);
                }
            } else if (bodyContentObject instanceof ByteBuffer) {
                request.setBody(((ByteBuffer) bodyContentObject).array());
            } else {
                ByteArrayOutputStream stream = new AccessibleByteArrayOutputStream();
                serializer.serialize(bodyContentObject, SerializerEncoding.fromHeaders(request.getHeaders()), stream);

                request.setHeader("Content-Length", String.valueOf(stream.size()));
                request.setBody(stream.toByteArray());
            }
        }

        return request;
    }

Response Handling

    private Object handleRestResponseReturnType(final HttpDecodedResponse response,
        final SwaggerMethodParser methodParser,
        final Type entityType) {
        if (TypeUtil.isTypeOrSubTypeOf(entityType, Response.class)) {
            final Type bodyType = TypeUtil.getRestResponseBodyType(entityType);

            if (TypeUtil.isTypeOrSubTypeOf(bodyType, Void.class)) {
                response.getSourceResponse().getContent().toBytes();
                return createResponseSync(response, entityType, null);
            } else {
                Object bodyAsObject =  handleBodyReturnTypeSync(response, methodParser, bodyType);
                Response<?> httpResponse = createResponseSync(response, entityType, bodyAsObject);
                if (httpResponse == null) {
                    return createResponseSync(response, entityType, null);
                }
                return httpResponse;
            }
        } else {
            // For now, we're just throwing if the Maybe didn't emit a value.
            return handleBodyReturnTypeSync(response, methodParser, entityType);
        }
    }


    @SuppressWarnings("unchecked")
    private Response<?> createResponseSync(HttpDecodedResponse response, Type entityType, Object bodyAsObject) {
        final Class<? extends Response<?>> cls = (Class<? extends Response<?>>) TypeUtil.getRawClass(entityType);

        final HttpResponse httpResponse = response.getSourceResponse();
        final HttpRequest request = httpResponse.getRequest();
        final int statusCode = httpResponse.getStatusCode();
        final HttpHeaders headers = httpResponse.getHeaders();
        final Object decodedHeaders = response.getDecodedHeaders();
        // Inspection of the response type needs to be performed to determine which course of action should be taken to
        // instantiate the Response<?> from the HttpResponse.
        //
        // If the type is either the Response or PagedResponse interface from azure-core a new instance of either
        // ResponseBase or PagedResponseBase can be returned.
        if (cls.equals(Response.class)) {
            // For Response return a new instance of ResponseBase cast to the class.
            return cls.cast(new ResponseBase<>(request, statusCode, headers, bodyAsObject, decodedHeaders));
        } else if (cls.equals(PagedResponse.class)) {
            // For PagedResponse return a new instance of PagedResponseBase cast to the class.
            //
            // PagedResponse needs an additional check that the bodyAsObject implements Page.
            //
            // If the bodyAsObject is null use the constructor that take items and continuation token with null.
            // Otherwise, use the constructor that take Page.
            if (bodyAsObject != null && !TypeUtil.isTypeOrSubTypeOf(bodyAsObject.getClass(), Page.class)) {
                throw LOGGER.logExceptionAsError(new RuntimeException(MUST_IMPLEMENT_PAGE_ERROR));
            } else if (bodyAsObject == null) {
                return (cls.cast(new PagedResponseBase<>(request, statusCode, headers, null, null,
                    decodedHeaders)));
            } else {
                return (cls.cast(new PagedResponseBase<>(request, statusCode, headers, (Page<?>) bodyAsObject,
                    decodedHeaders)));
            }
        }

        // Otherwise, rely on reflection, for now, to get the best constructor to use to create the Response sub-type.
        //
        // Ideally, in the future the SDKs won't need to dabble in reflection here as the Response sub-types should be
        // given a way to register their constructor as a callback method that consumes HttpDecodedResponse and the
        // body as an Object.

        MethodHandle ctr = RESPONSE_CONSTRUCTORS_CACHE.get(cls);

        if (ctr == null) {
            throw new RuntimeException("Cannot find suitable constructor for class " + cls);
        }
        return RESPONSE_CONSTRUCTORS_CACHE.invokeSync(ctr, response, bodyAsObject);
    }

    private Object handleBodyReturnTypeSync(final HttpDecodedResponse response,
                                         final SwaggerMethodParser methodParser, final Type entityType) {
        final int responseStatusCode = response.getSourceResponse().getStatusCode();
        final HttpMethod httpMethod = methodParser.getHttpMethod();
        final Type returnValueWireType = methodParser.getReturnValueWireType();

        final Object result;
        if (httpMethod == HttpMethod.HEAD
            && (TypeUtil.isTypeOrSubTypeOf(
            entityType, Boolean.TYPE) || TypeUtil.isTypeOrSubTypeOf(entityType, Boolean.class))) {
            boolean isSuccess = (responseStatusCode / 100) == 2;
            result = isSuccess;
        } else if (TypeUtil.isTypeOrSubTypeOf(entityType, byte[].class)) {
            // byte[]
            byte[] responseBodyBytes = response.getSourceResponse().getContent().toBytes();
            if (returnValueWireType == Base64Url.class) {
                // Base64Url
                responseBodyBytes = new Base64Url(responseBodyBytes).decodedBytes();
            }
            result = responseBodyBytes;
        }  else if (TypeUtil.isTypeOrSubTypeOf(entityType, BinaryData.class)) {
            // BinaryData
            // The raw response is directly used to create an instance of BinaryData which then provides
            // different methods to read the response. The reading of the response is delayed until BinaryData
            // is read and depending on which format the content is converted into, the response is not necessarily
            // fully copied into memory resulting in lesser overall memory usage.
            result = response.getSourceResponse().getContent();
        } else {
            // Mono<Object> or Mono<Page<T>>
            result = response.getDecodedBodySync((byte[]) null);
        }
        return result;
    }

    /**
     * Handle the provided asynchronous HTTP response and return the deserialized value.
     *
     * @param httpDecodedResponse the asynchronous HTTP response to the original HTTP request
     * @param methodParser the SwaggerMethodParser that the request originates from
     * @param returnType the type of value that will be returned
     * @param context Additional context that is passed through the Http pipeline during the service call.
     * @return the deserialized result
     */
    private Object handleRestReturnType(final HttpDecodedResponse httpDecodedResponse,
        final SwaggerMethodParser methodParser,
        final Type returnType,
        final Context context,
        final RequestOptions options) {
        final HttpDecodedResponse expectedResponse =
            ensureExpectedStatus(httpDecodedResponse, methodParser, options);
	    
        final Object result;

        if (TypeUtil.isTypeOrSubTypeOf(returnType, void.class) || TypeUtil.isTypeOrSubTypeOf(returnType,
            Void.class)) {
            // ProxyMethod ReturnType: Void
            result = expectedResponse;
        } else {
            // ProxyMethod ReturnType: T where T != async (Mono, Flux) or sync Void
            // Block the deserialization until a value T is received
            result = handleRestResponseReturnType(httpDecodedResponse, methodParser, returnType);
        }
        return result;
    }

Tracing Handling

    /**
     * Starts the tracing span for the current service call, additionally set metadata attributes on the span by passing
     * additional context information.
     *
     * @param method Service method being called.
     * @param context Context information about the current service call.
     * @return The updated context containing the span context.
     */
    private Context startTracingSpan(Method method, Context context) {
        // First check if tracing is enabled. This is an optimized operation, so it is done first.
        if (!TracerProxy.isTracingEnabled()) {
            return context;
        }

        // Then check if this method disabled tracing. This requires walking a linked list, so do it last.
        if ((boolean) context.getData(Tracer.DISABLE_TRACING_KEY).orElse(false)) {
            return context;
        }

        String spanName = interfaceParser.getServiceName() + "." + method.getName();
        context = TracerProxy.setSpanName(spanName, context);
        return TracerProxy.start(spanName, context);
    }

    // This handles each onX for the response mono.
    // The signal indicates the status and contains the metadata we need to end the tracing span.
    private static void endTracingSpan(HttpDecodedResponse httpDecodedResponse, Throwable throwable, Context tracingContext) {


        // Get the context that was added to the mono, this will contain the information needed to end the span.
        Object disableTracingValue = (tracingContext.getData(Tracer.DISABLE_TRACING_KEY).isPresent()
            ? tracingContext.getData(Tracer.DISABLE_TRACING_KEY).get() : null);
        boolean disableTracing = Boolean.TRUE.equals(disableTracingValue != null ? disableTracingValue : false);

        if (tracingContext == null || disableTracing) {
            return;
        }

        int statusCode = 0;

        // On next contains the response information.
        if (httpDecodedResponse != null) {
            //noinspection ConstantConditions
            statusCode = httpDecodedResponse.getSourceResponse().getStatusCode();
        } else if (throwable != null) {
            // The last status available is on error, this contains the error thrown by the REST response.
            // Only HttpResponseException contain a status code, this is the base REST response.
            if (throwable instanceof HttpResponseException) {
                HttpResponseException exception = (HttpResponseException) throwable;
                statusCode = exception.getResponse().getStatusCode();
            }
        }
        TracerProxy.end(statusCode, throwable, tracingContext);
    }

Helper Methods

    @SuppressWarnings("deprecation")
    void validateResumeOperationIsNotPresent(Method method) {
        // Use the fully-qualified class name as javac will throw deprecation warnings on imports when the class is
        // marked as deprecated.
        if (method.isAnnotationPresent(com.azure.core.annotation.ResumeOperation.class)) {
            throw LOGGER.logExceptionAsError(new IllegalStateException("'ResumeOperation' isn't supported."));
        }
    }

    static Context mergeRequestOptionsContext(Context context, RequestOptions options) {
        if (options == null) {
            return context;
        }

        Context optionsContext = options.getContext();
        if (optionsContext != null && optionsContext != Context.NONE) {
            context = CoreUtils.mergeContexts(context, optionsContext);
        }

        return context;
    }

    static BinaryData validateLengthSync(final HttpRequest request) {
        final BinaryData binaryData = request.getContent();
        if (binaryData == null) {
            return binaryData;
        }

        final long expectedLength = Long.parseLong(request.getHeaders().getValue("Content-Length"));
        Long length = binaryData.getLength();

        BinaryDataContent bdc = BinaryDataHelper.getContent(binaryData);
       if (length == null) {
            if (bdc instanceof FluxByteBufferContent) {
                throw new IllegalStateException("Flux Byte Buffer is not supported in Synchronous Rest Proxy.");
            } else if (bdc instanceof InputStreamContent) {
                InputStreamContent inputStreamContent = ((InputStreamContent) bdc);
                InputStream inputStream = inputStreamContent.toStream();
                LengthValidatingInputStream lengthValidatingInputStream =
                    new LengthValidatingInputStream(inputStream, expectedLength);
                return BinaryData.fromStream(lengthValidatingInputStream);
            } else  {
                byte[] b = (bdc).toBytes();
                long len = b.length;
                if (len > expectedLength) {
                    throw new UnexpectedLengthException(String.format(BODY_TOO_LARGE,
                        len, expectedLength), len, expectedLength);
                }
                return BinaryData.fromBytes(b);
            }
        } else {
           if (length > expectedLength) {
               throw new UnexpectedLengthException(String.format(BODY_TOO_LARGE,
                   length, expectedLength), length, expectedLength);
           }
           return binaryData;
       }
    }


    private static Exception instantiateUnexpectedException(final UnexpectedExceptionInformation exception,
        final HttpResponse httpResponse, final byte[] responseContent, final Object responseDecodedContent) {
        final int responseStatusCode = httpResponse.getStatusCode();
        final String contentType = httpResponse.getHeaderValue("Content-Type");
        final String bodyRepresentation;
        if ("application/octet-stream".equalsIgnoreCase(contentType)) {
            bodyRepresentation = "(" + httpResponse.getHeaderValue("Content-Length") + "-byte body)";
        } else {
            bodyRepresentation = responseContent == null || responseContent.length == 0
                ? "(empty body)"
                : "\"" + new String(responseContent, StandardCharsets.UTF_8) + "\"";
        }

        Exception result;
        try {
            final Constructor<? extends HttpResponseException> exceptionConstructor = exception.getExceptionType()
                .getConstructor(String.class, HttpResponse.class, exception.getExceptionBodyType());
            result = exceptionConstructor.newInstance("Status code " + responseStatusCode + ", " + bodyRepresentation,
                httpResponse, responseDecodedContent);
        } catch (ReflectiveOperationException e) {
            String message = "Status code " + responseStatusCode + ", but an instance of "
                + exception.getExceptionType().getCanonicalName() + " cannot be created."
                + " Response body: " + bodyRepresentation;

            result = new IOException(message, e);
        }
        return result;
    }

    /**
     * Create a publisher that (1) emits error if the provided response {@code decodedResponse} has 'disallowed status
     * code' OR (2) emits provided response if it's status code ia allowed.
     *
     * 'disallowed status code' is one of the status code defined in the provided SwaggerMethodParser or is in the int[]
     * of additional allowed status codes.
     *
     * @param decodedResponse The HttpResponse to check.
     * @param methodParser The method parser that contains information about the service interface method that initiated
     * the HTTP request.
     * @return An async-version of the provided decodedResponse.
     */
    private HttpDecodedResponse ensureExpectedStatus(final HttpDecodedResponse decodedResponse,
        final SwaggerMethodParser methodParser, RequestOptions options) {
        final int responseStatusCode = decodedResponse.getSourceResponse().getStatusCode();

        // If the response was success or configured to not return an error status when the request fails, return the
        // decoded response.
        if (methodParser.isExpectedResponseStatusCode(responseStatusCode)
            || (options != null && options.getErrorOptions().contains(ErrorOptions.NO_THROW))) {
            return decodedResponse;
        }

        // Otherwise, the response wasn't successful and the error object needs to be parsed.
        byte[] responseBytes = decodedResponse.getSourceResponse().getContent().toBytes();
        if (responseBytes == null || responseBytes.length == 0) {
            //  No body, create exception empty content string no exception body object.
            throw new RuntimeException(instantiateUnexpectedException(
                methodParser.getUnexpectedException(responseStatusCode), decodedResponse.getSourceResponse(),
                null, null));
        } else {
            Object decodedBody =  decodedResponse.getDecodedBodySync(responseBytes);
            // create exception with un-decodable content string and without exception body object.
            throw new RuntimeException(instantiateUnexpectedException(
                methodParser.getUnexpectedException(responseStatusCode),
                decodedResponse.getSourceResponse(), responseBytes, decodedBody));
        }
    }

    /**
     * Create an instance of the default serializer.
     *
     * @return the default serializer
     */
    private static SerializerAdapter createDefaultSerializer() {
        return JacksonAdapter.createDefaultSerializerAdapter();
    }

    /**
     * Create the default HttpPipeline.
     *
     * @return the default HttpPipeline
     */
    private static HttpPipeline createDefaultPipeline() {
        List<HttpPipelinePolicy> policies = new ArrayList<>();
        policies.add(new UserAgentPolicy());
        policies.add(new RetryPolicy());
        policies.add(new CookiePolicy());

        return new HttpPipelineBuilder()
            .policies(policies.toArray(new HttpPipelinePolicy[0]))
            .build();
    }

    /**
     * Create a proxy implementation of the provided Swagger interface.
     *
     * @param swaggerInterface the Swagger interface to provide a proxy implementation for
     * @param <A> the type of the Swagger interface
     * @return a proxy implementation of the provided Swagger interface
     */
    public static <A> A create(Class<A> swaggerInterface) {
        return create(swaggerInterface, createDefaultPipeline(), createDefaultSerializer());
    }

    /**
     * Create a proxy implementation of the provided Swagger interface.
     *
     * @param swaggerInterface the Swagger interface to provide a proxy implementation for
     * @param httpPipeline the HttpPipelinePolicy and HttpClient pipeline that will be used to send Http requests
     * @param <A> the type of the Swagger interface
     * @return a proxy implementation of the provided Swagger interface
     */
    public static <A> A create(Class<A> swaggerInterface, HttpPipeline httpPipeline) {
        return create(swaggerInterface, httpPipeline, createDefaultSerializer());
    }

    /**
     * Create a proxy implementation of the provided Swagger interface.
     *
     * @param swaggerInterface the Swagger interface to provide a proxy implementation for
     * @param httpPipeline the HttpPipelinePolicy and HttpClient pipline that will be used to send Http requests
     * @param serializer the serializer that will be used to convert POJOs to and from request and response bodies
     * @param <A> the type of the Swagger interface.
     * @return a proxy implementation of the provided Swagger interface
     */
    @SuppressWarnings("unchecked")
    public static <A> A create(Class<A> swaggerInterface, HttpPipeline httpPipeline, SerializerAdapter serializer) {
        final SwaggerInterfaceParser interfaceParser = new SwaggerInterfaceParser(swaggerInterface, serializer);
        final SyncRestProxy restProxy = new SyncRestProxy(httpPipeline, serializer, interfaceParser);
        return (A) Proxy.newProxyInstance(swaggerInterface.getClassLoader(), new Class<?>[]{swaggerInterface},
            restProxy);
    }
}

Sync Stack Flow

image

KV SDK

image

KV Service Sync Interface

@Host("{url}")
@ServiceInterface(name = "KeyVaultSecrets")
interface SecretServiceSync {

    @Put("secrets/{secret-name}")
    @ExpectedResponses({200})
    @UnexpectedResponseExceptionType(code = {400}, value = ResourceModifiedException.class)
    @UnexpectedResponseExceptionType(HttpResponseException.class)
    Response<KeyVaultSecret> setSecret(@HostParam("url") String url,
                                             @PathParam("secret-name") String secretName,
                                             @QueryParam("api-version") String apiVersion,
                                             @HeaderParam("accept-language") String acceptLanguage,
                                             @BodyParam("application/json") SecretRequestParameters parameters,
                                             @HeaderParam("Content-Type") String type,
                                             Context context);
}

Stack Trace with Sync over Async

[main] INFO com.azure.core.implementation.jackson.JacksonVersion - Package versions: jackson-annotations=2.13.1, jackson-core=2.13.1, jackson-databind=2.13.1, jackson-dataformat-xml=2.13.1, jackson-datatype-jsr310=2.13.1, azure-core=1.27.0-beta.1, Troubleshooting version conflicts: https://aka.ms/azsdk/java/dependency/troubleshoot
[reactor-http-kqueue-2] INFO com.azure.identity.implementation.HttpPipelineAdapter - [Authenticated account] Client ID: 27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f, Tenant ID: c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e, User Principal Name: 72f988bf-86f1-41af-91ab-2d7cd011db47, Object ID (user): No User Principal Name available.)
[ForkJoinPool.commonPool-worker-9] INFO com.azure.identity.ClientSecretCredential - Azure Identity => getToken() result for scopes [https://vault.azure.net/.default]: SUCCESS
[ForkJoinPool.commonPool-worker-9] INFO com.azure.core.implementation.AccessTokenCache - Acquired a new access token.
[reactor-http-kqueue-1] WARN com.azure.security.keyvault.secrets.SecretAsyncClient - Failed to set secret - test
Status code 403, "{"error":{"code":"Forbidden","message":"The user, group or application 'appid=27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f;oid=c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e;iss=https://sts.windows.net/72f988bf-86f1-41af-91ab-2d7cd011db47/' does not have secrets set permission on key vault 'vigera-test-secrets;location=westus2'. For help resolving this issue, please see https://go.microsoft.com/fwlink/?linkid=2125287","innererror":{"code":"AccessDenied"}}}"
Exception in thread "main" com.azure.core.exception.HttpResponseException: Status code 403, "{"error":{"code":"Forbidden","message":"The user, group or application 'appid=27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f;oid=c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e;iss=https://sts.windows.net/72f988bf-86f1-41af-91ab-2d7cd011db47/' does not have secrets set permission on key vault 'vigera-test-secrets;location=westus2'. For help resolving this issue, please see https://go.microsoft.com/fwlink/?linkid=2125287","innererror":{"code":"AccessDenied"}}}"
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:390)
	at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$7(RestProxy.java:445)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:337)
	at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:354)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
	at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:293)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:192)
	at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2664)
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:213)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1371)
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1245)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
	at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:383)
	at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Mono.block(Mono.java:1707)
		at com.test.demo.Test.main(Test.java:55)

Stack Trace with Sync Stack

[main] INFO com.azure.core.implementation.jackson.JacksonVersion - Package versions: jackson-annotations=2.13.1, jackson-core=2.13.1, jackson-databind=2.13.1, jackson-dataformat-xml=2.13.1, jackson-datatype-jsr310=2.13.1, azure-core=1.27.0-beta.1, Troubleshooting version conflicts: https://aka.ms/azsdk/java/dependency/troubleshoot
[reactor-http-kqueue-2] INFO com.azure.identity.implementation.HttpPipelineAdapter - [Authenticated account] Client ID: 27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f, Tenant ID: c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e, User Principal Name: 72f988bf-86f1-41af-91ab-2d7cd011db47, Object ID (user): No User Principal Name available.)
[ForkJoinPool.commonPool-worker-9] INFO com.azure.identity.ClientSecretCredential - Azure Identity => getToken() result for scopes [https://vault.azure.net/.default]: SUCCESS
[ForkJoinPool.commonPool-worker-9] INFO com.azure.core.implementation.AccessTokenCache - Acquired a new access token.
Exception in thread "main" java.lang.RuntimeException: com.azure.core.exception.HttpResponseException: Status code 403, "{"error":{"code":"Forbidden","message":"The user, group or application 'appid=27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f;oid=c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e;iss=https://sts.windows.net/72f988bf-86f1-41af-91ab-2d7cd011db47/' does not have secrets set permission on key vault 'vigera-test-secrets;location=westus2'. For help resolving this issue, please see https://go.microsoft.com/fwlink/?linkid=2125287","innererror":{"code":"AccessDenied"}}}"
	at com.azure.core.http.rest.SyncRestProxy.ensureExpectedStatus(SyncRestProxy.java:436)
	at com.azure.core.http.rest.SyncRestProxy.handleRestReturnType(SyncRestProxy.java:564)
	at com.azure.core.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:137)
	at com.azure.security.keyvault.secrets.$Proxy4.setSecret(Unknown Source)
	at com.azure.security.keyvault.secrets.SecretClient.setSecretWithResponse(SecretClient.java:189)
	at com.azure.security.keyvault.secrets.SecretClient.setSecret(SecretClient.java:155)
	at com.test.demo.Test.main(Test.java:47)
Caused by: com.azure.core.exception.HttpResponseException: Status code 403, "{"error":{"code":"Forbidden","message":"The user, group or application 'appid=27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f;oid=c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e;iss=https://sts.windows.net/72f988bf-86f1-41af-91ab-2d7cd011db47/' does not have secrets set permission on key vault 'vigera-test-secrets;location=westus2'. For help resolving this issue, please see https://go.microsoft.com/fwlink/?linkid=2125287","innererror":{"code":"AccessDenied"}}}"
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.azure.core.http.rest.SyncRestProxy.instantiateUnexpectedException(SyncRestProxy.java:391)
	... 7 more

[main] INFO com.azure.core.implementation.jackson.JacksonVersion - Package versions: jackson-annotations=2.13.1, jackson-core=2.13.1, jackson-databind=2.13.1, jackson-dataformat-xml=2.13.1, jackson-datatype-jsr310=2.13.1, azure-core=1.27.0-beta.1, Troubleshooting version conflicts: https://aka.ms/azsdk/java/dependency/troubleshoot
[reactor-http-kqueue-2] INFO com.azure.identity.implementation.HttpPipelineAdapter - [Authenticated account] Client ID: 27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f, Tenant ID: c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e, User Principal Name: 72f988bf-86f1-41af-91ab-2d7cd011db47, Object ID (user): No User Principal Name available.)
[ForkJoinPool.commonPool-worker-9] INFO com.azure.identity.ClientSecretCredential - Azure Identity => getToken() result for scopes [https://vault.azure.net/.default]: SUCCESS
[ForkJoinPool.commonPool-worker-9] INFO com.azure.core.implementation.AccessTokenCache - Acquired a new access token.
[reactor-http-kqueue-1] WARN com.azure.security.keyvault.secrets.SecretAsyncClient - Failed to set secret - test
Exception in thread "main" java.lang.RuntimeException: com.azure.core.exception.HttpResponseException: Status code 403, "{"error":{"code":"Forbidden","message":"The user, group or application 'appid=27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f;oid=c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e;iss=https://sts.windows.net/72f988bf-86f1-41af-91ab-2d7cd011db47/' does not have secrets set permission on key vault 'vigera-test-secrets;location=westus2'. For help resolving this issue, please see https://go.microsoft.com/fwlink/?linkid=2125287","innererror":{"code":"AccessDenied"}}}"
	at com.azure.core.http.rest.SyncRestProxy.ensureExpectedStatus(SyncRestProxy.java:436)
	at com.azure.core.http.rest.SyncRestProxy.handleRestReturnType(SyncRestProxy.java:564)
	at com.azure.core.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:137)
	at com.azure.security.keyvault.secrets.$Proxy4.setSecret(Unknown Source)
	at com.azure.security.keyvault.secrets.SecretClient.setSecretWithResponse(SecretClient.java:189)
	at com.azure.security.keyvault.secrets.SecretClient.setSecret(SecretClient.java:155)
	at com.test.demo.Test.main(Test.java:47)
Caused by: com.azure.core.exception.HttpResponseException: Status code 403, "{"error":{"code":"Forbidden","message":"The user, group or application 'appid=27d85698-74dd-4dd9-9f2d-8ae0e0e0ac4f;oid=c21a53bd-e8d7-412c-b60f-19e9f7fb0c7e;iss=https://sts.windows.net/72f988bf-86f1-41af-91ab-2d7cd011db47/' does not have secrets set permission on key vault 'vigera-test-secrets;location=westus2'. For help resolving this issue, please see https://go.microsoft.com/fwlink/?linkid=2125287","innererror":{"code":"AccessDenied"}}}"
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.azure.core.http.rest.SyncRestProxy.instantiateUnexpectedException(SyncRestProxy.java:391)
	... 7 more

image

Storage Stack Trace with Sync Rest Proxy

java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: com.azure.core.exception.UnexpectedLengthException: Request body emitted 10239 bytes, less than the expected10240bytes.
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:259)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
at com.azure.storage.App.main(App.java:35)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: com.azure.core.exception.UnexpectedLengthException: Request body emitted 10239 bytes, less than the expected10240bytes.
at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:237)
... 3 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: com.azure.core.exception.UnexpectedLengthException: Request body emitted 10239 bytes, less than the expected10240bytes.
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
... 5 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: com.azure.core.exception.UnexpectedLengthException: Request body emitted 10239 bytes, less than the expected10240bytes.
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$15(PerfStressProgram.java:236)
at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.RuntimeException: com.azure.core.exception.UnexpectedLengthException: Request body emitted 10239 bytes, less than the expected10240bytes.
at com.azure.core.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:148)
at com.sun.proxy.$Proxy37.upload(Unknown Source)
at com.azure.storage.blob.implementation.BlockBlobsImpl.uploadWithResponseSync(BlockBlobsImpl.java:571)
at com.azure.storage.blob.specialized.BlockBlobAsyncClient.uploadWithResponseSync(BlockBlobAsyncClient.java:378)
at com.azure.storage.blob.specialized.BlockBlobClient.uploadWithResponse(BlockBlobClient.java:419)
at com.azure.storage.blob.specialized.BlockBlobClient.uploadWithResponse(BlockBlobClient.java:363)
at com.azure.storage.blob.specialized.BlockBlobClient.upload(BlockBlobClient.java:297)
at com.azure.storage.blob.perf.UploadBlockBlobTest.run(UploadBlockBlobTest.java:31)
at com.azure.perf.test.core.PerfStressProgram.runLoop(PerfStressProgram.java:285)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$14(PerfStressProgram.java:236)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
... 5 more
Caused by: com.azure.core.exception.UnexpectedLengthException: Request body emitted 10239 bytes, less than the expected10240bytes.
at com.azure.core.implementation.util.LengthValidatingInputStream.validateLength(LengthValidatingInputStream.java:115)
at com.azure.core.implementation.util.LengthValidatingInputStream.read(LengthValidatingInputStream.java:48)
at com.azure.core.util.FluxUtil.lambda$toFluxByteBuffer$4(FluxUtil.java:252)
at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:265)
at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:207)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:253)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87)
at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:84)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:101)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122)
at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:336)
at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:69)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:424)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:671)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:183)
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:439)
at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
at reactor.netty.tcp.SslProvider$SslReadHandler.userEventTriggered(SslProvider.java:828)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
at io.netty.handler.ssl.SslHandler.setHandshakeSuccess(SslHandler.java:1836)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1358)
at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1245)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1285)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.core.http.HttpClient.sendSynchronously(HttpClient.java:42)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpLoggingPolicy.processSynchronously(HttpLoggingPolicy.java:128)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.storage.common.policy.RequestRetryPolicy.attemptSync(RequestRetryPolicy.java:277)
at com.azure.storage.common.policy.RequestRetryPolicy.processSynchronously(RequestRetryPolicy.java:61)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
at com.azure.core.http.HttpPipeline.sendSynchronously(HttpPipeline.java:139)
at com.azure.core.http.HttpPipeline.sendSynchronously(HttpPipeline.java:112)
at com.azure.core.http.rest.SyncRestProxy.send(SyncRestProxy.java:102)
at com.azure.core.http.rest.SyncRestProxy.invoke(SyncRestProxy.java:140)
at com.sun.proxy.$Proxy37.upload(Unknown Source)
at com.azure.storage.blob.implementation.BlockBlobsImpl.uploadWithResponseSync(BlockBlobsImpl.java:571)
at com.azure.storage.blob.specialized.BlockBlobAsyncClient.uploadWithResponseSync(BlockBlobAsyncClient.java:378)
at com.azure.storage.blob.specialized.BlockBlobClient.uploadWithResponse(BlockBlobClient.java:419)
at com.azure.storage.blob.specialized.BlockBlobClient.uploadWithResponse(BlockBlobClient.java:363)
at com.azure.storage.blob.specialized.BlockBlobClient.upload(BlockBlobClient.java:297)
at com.azure.storage.blob.perf.UploadBlockBlobTest.run(UploadBlockBlobTest.java:31)
at com.azure.perf.test.core.PerfStressProgram.runLoop(PerfStressProgram.java:285)
at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$14(PerfStressProgram.java:236)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Perf Data

Storage Upload API - Throughput

OkHttp Client

image

image

Netty Client

image

image

Storage Upload API - Memory Usage

OkHttp Client

image image

Netty Client

image image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment