Skip to content

Instantly share code, notes, and snippets.

@kasobol-msft
Last active June 10, 2022 21:21
Show Gist options
  • Save kasobol-msft/884ac3b6abfe120af6aeb391f09666d0 to your computer and use it in GitHub Desktop.
Save kasobol-msft/884ac3b6abfe120af6aeb391f09666d0 to your computer and use it in GitHub Desktop.
Sync stack prototype

First Prototype

Overview

This document presents an attempt to build a fully synchronous stack and make it work with couple of simple Storage APIs. The goal is to achieve fully synchronous execution or be as close as possible (due to nature of underlying http client) and then compare its runtime behavior with existing solution.

Synchronous stack bodge

The synchronous stack has been bodged through all the layers starting from high level client to the http client call or terminated right above http client (netty). Full implementation can be found in this commit.

The bodge is based on a naive attempt to abstract Reactor by introducing a layer of operators that map mostly 1:1 to Reactor's operators that has been encountered while plowing through the layers of code. These operators add ability to execute them synchronously in addition to reactive chain construction and let upper layer choose which option is materialized in order to fullil API contract. This approach has been chosen to assure that the sync stack has similar computational complexity as asynchronous counterpart, which is important for benchmarks.

The APIs that has been tried are BlobContainerClient.create(), BlobServiceClient.getProperties() and BlobClient.getProperties(). BlobServiceClient.getProperties() and BlobClient.getProperties() has been used for benchmarking because they perform better at scale both with Azurite and real service. The difference between BlobServiceClient.getProperties() and BlobClient.getProperties() is that the former has a body in the response and the later has only headers.

Ingredients

  1. Naive abstraction over Reactor introducing sync call stack - SingleResult and MultiResult - in addition to be able to create reactive chain they can also offer sync equivalent to compute T or Stream<T> which are closest sync equivalents to Mono<T> and Flux<T>.
  2. Manually altered generated interface like SingleResult getProperties().
  3. Public BlobClient.getProperties() calling directly to sync stack here.
  4. Changes in RestProxy and surroundings to make sure call stack from here to http client call uses the new sync-async operators.
  5. Synchronously call OkHttp, JDK http client OR block right after calling Netty.

Synchronousness

This is example stack trace when calling into OkHttp.

	at okhttp3.internal.connection.RealCall.execute(RealCall.kt:148)
	at com.azure.core.http.okhttp.OkHttpAsyncHttpClient.lambda$send2$3(OkHttpAsyncHttpClient.java:89)
	at com.azure.core.syncasync.SyncAsyncSingleResult.getSync(SyncAsyncSingleResult.java:22)
	at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
	at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
	at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
	at com.azure.core.syncasync.DeferredSingleResult.getSync(DeferredSingleResult.java:18)
	at com.azure.core.syncasync.DoAsyncOnlySingleResult.getSync(DoAsyncOnlySingleResult.java:20)
	at com.azure.core.syncasync.DelaySingleResult.getSync(DelaySingleResult.java:27)
	at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
	at com.azure.core.syncasync.OnErrorResumeSingleResult.getSync(OnErrorResumeSingleResult.java:21)
	at com.azure.core.syncasync.DeferredSingleResult.getSync(DeferredSingleResult.java:18)
	at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
	at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
	at com.azure.core.http.rest.RestProxy.lambda$handleRestReturnType$17(RestProxy.java:591)
	at com.azure.core.syncasync.SyncAsyncSingleResult.getSync(SyncAsyncSingleResult.java:22)
	at com.azure.core.syncasync.FlatMapSingleResult.getSync(FlatMapSingleResult.java:20)
	at com.azure.core.syncasync.MappingSingleResult.getSync(MappingSingleResult.java:20)
	at com.azure.storage.blob.BlobServiceClient.getPropertiesWithResponse(BlobServiceClient.java:344)
	at com.azure.storage.blob.BlobServiceClient.getProperties(BlobServiceClient.java:317)
	at com.azure.storage.App.main(App.java:95)

Note that stack trace is as horrible as Reactor's traces, but at least it provide insight into what's happening from the API until it hits http layer. Creating imperative style SyncRestProxy might be a better idea to improve here, i.e. be imperative below generated APIs since we have one core that lightens up hundreds of APIs. For logic above generated code these new operators (or something equivalent) might be useful to minimize code duplication.

OkHttp

The OkHttp client offers Call.execute() that's fully synchronous and uses caller thread to do the work. This fact is then reflected in benchmark results.

Netty

We're using Reactor Netty but even with raw Netty we'd be facing sync-over-async at some point. For the purpose of this experiment the async stack is blocked right in NettyAsyncHttpClient. However, it might be worth considering an implementation where we talk to netty directly without involving reactor on sync paths.

JDK Http Client

The JDK Http Client offers synchronous HttpClient.send() API. Which can then materialize response body as InputStream or byte[] for example.

However, the JDK Http Client internally calls into async implementation and blocks, see here.

Testing

Perf

The perf runs are based on azure-storage-perf with the following alterations.

  1. Added GetAccountPropertiesTest and GetBlobPropertiesTest.
  2. The blobs and container names have been defined constant across all runs to minimize variance coming from the service allocating these resources on different hardware. See here and here.
  3. The HttpClient instance is made global singleton to reflect recommended usage pattern. See here.
  4. The OkHttp client requires customization for higher degree of parallelization for sync-over-async runs. See here. It grossly underperforms on default settings. This problem does not appear with full sync call.
  5. Two versions of azure-storage-perf fat jar has been prepared. One with bodged sync stack and one using lately released packages. See here.

The runs have been performed in West US 2 region with Standard DS3 v2 (4 vcpus, 14 GiB memory) VM running Ubuntu 18. Which then has been updated to D4ds v5 (4 vcpus, 16 GiB memory) for upload scenarios to assure sufficient network bandwith.

There has been 15 runs conducted for each combination of parameters. Top 10 results has been taken for each combination to rule out grossly underperforming outliers (fallacies of distributed computing...). Tables below present AVG with STDDEV and MAX out of these samples.

The heap size has been fixed to 2 GB to minimize possible variance coming from resizing.

Script used to run samples (this script is evolving):

#!/bin/bash

export STORAGE_CONNECTION_STRING="REDACTED"

for apiname in getblobproperties
do
  for httpclient in okhttp netty jdk
  do
    for parallel in 1 2 10 30
    do
      for syncasync in full-sync sync-over-async
      do
        for counter in {1..15}
        do
          echo "Starting $apiname $httpclient $syncasync parallel=$parallel $counter"
          set -x
          java -Xms2g -Xmx2g -D"perf.httpclient=$httpclient" -jar azure-storage-perf-$syncasync.jar $apiname --sync --parallel $parallel --warmup 60 --duration 120
          set +x
          echo "Finished $apiname $httpclient $syncasync parallel=$parallel $counter"
          sleep 60
        done
      done
    done
  done
done

for apiname in getaccountproperties
do
  for httpclient in okhttp netty jdk
  do
    for parallel in 1 2 10 20
    do
      for syncasync in full-sync sync-over-async
      do
        for counter in {1..15}
        do
          echo "Starting $apiname $httpclient $syncasync parallel=$parallel $counter"
          set -x
          java -Xms2g -Xmx2g -D"perf.httpclient=$httpclient" -jar azure-storage-perf-$syncasync.jar $apiname --sync --parallel $parallel --warmup 60 --duration 120
          set +x
          echo "Finished $apiname $httpclient $syncasync parallel=$parallel $counter"
          sleep 60
        done
      done
    done
  done
done

Results

BlobClient.getProperties()

LRS StorageV2 (general purpose v2)

Executed on DS3 v2.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 294.8±12.9 275.5±17.9 239.0±11.3 239.6±14.0 288.1±21.8 278.8±15.4
2 595.3±32.6 565.7±15.2 536.5±29.9 477.2±50.7 596.7±17.8 575.6±16.4
10 2952.4±104.0 2814.5±101.6 2713.8±72.1 2682.2±73.1 2926.5±67.9 2731.9±83.0
30 9132.1±337.1 7605.5±309.1 8101.5±162.4 7736.1±115.2 7964.9±184.0 7838.1±133.5
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 313.1 308.9 256.4 270.3 334.2 303.9
2 642.1 591.4 599.0 596.2 629.8 610.4
10 3193.1 3042.3 2806.3 2785.6 3043.9 2874.9
30 9866.8 7942.0 8285.7 7948.7 8279.2 8082.8
LRS Premium BlockBlobStorage

Executed on DS3 v2.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 560.3±7.8 500.2±3.2 488.5±11.0 475±4.7 512.1±10.4 468.1±5.5
2 1160.0±15.2 1022.43±12.5 1011.9±18.0 978.4±7.3 1043.3±11.8 949.5±20.9
10 5726.1±129.2 4979.8±49.8 4984.5±94.0 4697.2±48.3 5283.4±54.4 4818.1±59.1
30 14009.1±140.0 10326.7±107.7 11629.1±92.1 10732.3±88.9 10542.5±92.9 9933.6±114.8
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 573.8 504.7 507.7 482.81 535.4 478.3
2 1195.5 1034.7 1029.4 987.9 1059.2 985.1
10 5929.9 5070.7 5097.6 4792.3 5393.2 4913.9
30 14304.7 10553.2 11756.2 10866.1 10666.9 10119.6

BlobServiceClient.getProperties()

LRS StorageV2 (general purpose v2)

Executed on DS3 v2.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 258.6±13.6 238.4±23.6 224.7±13.4 228.0±10.0 267.7±18.1 224.2±8.2
2 540.7±34.4 486.4±28.9 472.5±15.8 439.8±21.7 492.0±35.2 492.2±18.0
10 2586.4±153.7 2384.1±99.6 2335.2±40.0 2375.5±57.4 2674.4±63.5 2578.6±50.6
20 4984.5±165.0* 4653.4±126.7 4902.6±215.6 4420.4±191.5 4908.8±209.1 4667.2±125.8
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 282.3 282.5 251.3 249.0 304.5 256.7
2 609.3 531.7 513.6 493.4 544.8 529.6
10 2919.86 2557.7 2393.7 2479.6 2759.9 2662.3
20 5317.3* 4913.9 5244.7 4715.5 5271.5 4895.2

* Full-sync OkHttp started to hit service throttling for BlobServiceClient.getProperties() at the concurrency of 20. Half of the runs were facing this issue - that required more sampling to get some data.

LRS Premium BlockBlobStorage

Executed on DS3 v2.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 471.4±10.5 438.8±6.4 436.3±6.1 407.5±6.2 433.7±6.8 387.9±5.0
2 965.2±10.6 874.4±11.2 870.2±15.0 827.6±11.3 868.1±10.1 809.3±10.0
10 4725.2±43.1 4296.0±42.1 4190.5±72.0 4047.3±26.4 4291.2±52.6 3959.0±36.7
20 9318.0±74.6 7280.7±69.5 7996.1±69.5 7504.2±80.7 7570.3±32.7 6954.6±71.0
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
1 483.6 446.0 447.2 415.8 452.5 395.9
2 977.9 886.3 888.8 846.6 888.3 829.0
10 4780.2 4364.1 4241.4 4091.4 4383.3 3992.3
20 9422.3 7396.1 8112.0 8627.2 7656.4 7060.5

BlockBlobClient.upload(InputStream data, long length, boolean overwrite)

The synchronous implementation is bodged here.

Note:

  • This isn't best implementation, i.e. it doesn't avoid unnecessary InputStream->Stream conversions.
  • JDK client has not been tested - the bodge seems to have a connection leak on this execution path that wasn't easy to track down in short time.
  • The VM was changed from DS3 v2 to D4ds v5 due to network getting saturated with larger payloads.
LRS Premium BlockBlobStorage 10 KB

Executed on DS3 v2.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 202.0±5.3 189.4±2.4 188.1±5.5 186.1±3.2
2 425.3±5.4 406.8±8.1 385.4±4.9 374.9±8.4
10 2145.8±25.7 2000.4±17.4 1975.6±34.4 1944.1±30.5
30 6092.3±66.6 5570.1±87.4 5758.0±84.1 5558.4±73.9
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 210.8 192.8 195.1 192.2
2 437.2 416.36 396.7 396.0
10 2189.7 2036.1 2032.7 1990.8
30 6199.6 5743.7 5950.5 5687.7
LRS Premium BlockBlobStorage 10 KB - second attempt

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 220.4±3.4 216.2±5.4 205.2±2.9 205.7±4.0
2 450.2±8.4 435.5±7.4 419.2±7.8 418.0±6.1
10 2288.7±32.1 2187.7±22.8 2208.2±32.7 2163.6±54.6
20 4331.3±122.9 4292.3±90.6 4317.9±71.8 4261.4±33.6
30 6486.3±79.8 6319.2±75.8 6316.8±102.6 6298.0±112.3
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 227.1 227.6 209.2 212.0
2 471.6 452.5 432.2 428.0
10 2341.9 2237.0 2263.5 2234.7
20 4558.4 4456.1 4488.0 4306.0
30 6597.6 6516.1 6493.0 6467.8
LRS Premium BlockBlobStorage 20 KB

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 208.3±4.2 201.3±5.8 206.0±5.0 205.5±1.9
2 435.7±7.6 423.3±7.2 421.2±12.2 411.8±4.0
10 2189.4±34.0 2118.0±24.8 2117.6±48.7 2165.2±35.7
20 4272.5±51.3 4207.4±50.7 4283.5±66.3 4235.8±78.6
30 6278.6±93.3 6073.4±125.6 6171.6±99.6 6274.1±144.7
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 214.4 213.6 212.3 207.5
2 449.1 431.8 435.8 418.0
10 2266.0 2158.6 2194.6 2213.0
20 4272.5 4207.4 4283.5 4235.8
30 6413.9 6323.4 6361.7 6450.1
LRS Premium BlockBlobStorage 30 KB

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 202.1±4.0 201.8±5.0 203.9±7.3 203.1±2.8
2 424.4±8.3 415.1±13.9 408.8±2.7 404.1±7.8
10 2050.9±40.1 2082.9±36.0 2094.3±32.2 2081.2±38.8
20 4190.0±48.7 4156.3±60.3 4155.0±73.5 4186.8±36.1
30 5973.5±174.6 5861.6±144.8 5982.9±112.3 5851.5±92.8
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 207.7 212.1 217.6 207.7
2 434.7 432.6 413.7 417.8
10 2111.9 2142.1 2133.5 2141.3
20 4259.9 4224.1 4276.6 4265.4
30 6195.2 6091.4 6138.1 6044.5
LRS Premium BlockBlobStorage 50 KB

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 187.3±3.5 183.7±2.1 185.9±5.6 186.3±5.7
2 378.9±8.5 373.3±5.3 379.3±7.5 369.9±14.8
10 1932.9±23.7 1887.5±26.1 1959.0±45.4 1909.1±49.3
20 3700.1±85.3 3703.4±34.9 3792.3±62.2 3785.0±59.3
30 5440.4±87.6 5294.9±47.6 5514.5±55.1 5504.6±96.6
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 192.9 187.0 197.7 195.2
2 395.4 384.8 394.8 403.9
10 1973.3 1929.7 2022.0 2008.1
20 3878.0 3760.9 3931.9 3871.0
30 5589.0 5396.3 5615.0 5705.8
LRS Premium BlockBlobStorage 100 KB

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 158.0±4.6 153.8±2.9 160.3±4.6 162.2±2.0
2 314.7±7.9 312.0±7.6 324.8±5.8 316.9±9.8
10 1570.0±31.2 1577.9±19.1 1628.7±34.2 1638.9±25.9
20 3132.4±65.2 3033.6±77.1 3141.0±135.8 3140.4±61.5
30 4592.6±105.8 4479.0±88.5 4461.7±177.3 4643.7±120.0
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 167.9 160.0 167.1 166.2
2 328.7 321.8 337.9 333.5
10 1629.2 1615.7 1691.2 1688.2
20 3263.6 3160.7 3298.7 3226.1
30 4738.4 4649.6 4699.0 4805.1
LRS Premium BlockBlobStorage 500 KB

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 52.2±2.2 53.9±1.1 59.6±1.1 57.8±1.6
2 112.3±4.6 105.1±7.1 114.4±5.9 110.0±7.2
10 591.3±7.0 572.3±10.5 606.2±8.3 607.3±8.9
20 1133.7±9.9 1130.5±12.2 1225.5±17.6 1223.7±16.5
30 1687.2±12.8 1643.7±16.1 1783.4±32.0 1824.2±34.5
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 56.1 55.5 61.7 61.3
2 117.5 114.4 120.6 119.1
10 602.7 596.5 627.3 621.1
20 1150.4 1153.9 1252.8 1241.3
30 1704.7 1670.3 1857.5 1868.2
LRS Premium BlockBlobStorage 1 MB

Executed on D4ds v5.

Avg
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 40.3±0.6 40.2±0.9 42.1±0.8 42.1±0.5
2 82.5±1.4 80.0±2.0 85.4±1.2 83.4±3.3
10 404.5±3.4 402.1±4.4 430.6±11.5 423.9±6.6
20 807.1±9.0 772.7±14.6 830.9±11.2 851.3±10.3
30 1152.0±8.5 1095.9±10.2 1258.0±13.7 1259.8±12.0
Max
Concurrency OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
1 41.7 41.7 43.2 42.9
2 84.6 84.3 87.0 88.6
10 409.8 408.1 444.2 441.0
20 820.6 798.9 843.3 876.5
30 1170.5 1107.5 1274.0 1283.3
LRS Premium BlockBlobStorage - Comparison of various sizes at concurrency of 30

Executed on D4ds v5.

Avg
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 6486.3±79.8 6319.2±75.8 6316.8±102.6 6298.0±112.3
20 KB 6278.6±93.3 6073.4±125.6 6171.6±99.6 6274.1±144.7
30 KB 5973.5±174.6 5861.6±144.8 5982.9±112.3 5851.5±92.8
50 KB 5440.4±87.6 5294.9±47.6 5514.5±55.1 5504.6±96.6
100 KB 4592.6±105.8 4479.0±88.5 4461.7±177.3 4643.7±120.0
500 KB 1687.2±12.8 1643.7±16.1 1783.4±32.0 1824.2±34.5
1 MB 1152.0±8.5 1095.9±10.2 1258.0±13.7 1259.8±12.0
Max
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 6597.6 6516.1 6493.0 6467.8
20 KB 6413.9 6323.4 6361.7 6450.1
30 KB 6195.2 6091.4 6138.1 6044.5
50 KB 5589.0 5396.3 5615.0 5705.8
100 KB 4738.4 4649.6 4699.0 4805.1
500 KB 1704.7 1670.3 1857.5 1868.2
1 MB 1170.5 1107.5 1274.0 1283.3
Test Proxy - Comparison of various sizes at concurrency of 30

Executed on D4ds v5. Proxy on D32ds v5.

Avg
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 18371.3±199.2 13438.1±200.5 19272.7±215.3 17353.6±300.9
20 KB 17058.1±219.7 12717.0±157.9 18344.5±204.9 16582.3±318.0
30 KB 16009.6±109.5 12016.2±136.7 17662.3±207.8 15874.9±124.5
50 KB 14065.9±86.9 10656.3±148.7 16532.8±340.5 15513.0±182.9
100 KB 9647.3±52.0 7973.3±34.3 12392.6±135.6 11738.3±81.2
200 KB 6346.3±41.3 5731.0±20.4 7240.5±3.2* 7240.1±2.6*

* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.

Max
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 18874.6 13897.4 19569.7 17896.4
20 KB 17426.1 12954.7 18655.1 17221.4
30 KB 16194.4 12380.7 18077.7 16047.8
50 KB 14213.4 10921.4 17053.0 15799.7
100 KB 9754.8 8037.5 12676.5 11857.2
200 KB 6420.5 5781.0 7246.9* 7245.3*

* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.

Test Proxy - Comparison of various sizes at concurrency of 20

Executed on D4ds v5. Proxy on D32ds v5.

Avg
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 18048.2±70.8 13444.3±131.0 18264.1±214.0 16709.5±283.6
20 KB 16883.2±114.0 12414.5±122.2 17215.9±167.3 15978.4±161.7
30 KB 16024.4±197.3 11830.5±124.8 16691.7±203.8 15592.0±198.6
50 KB 14045.9±126.2 10498.2±167.1 15229.8±500.6 14783.9±93.0
100 KB 9446.8±43.4 7816.9±71.8 11230.7±403.6 11325.3±162.6
200 KB 6248.8±26.8 5528.1±14.8 7238.2±0.7* 7238.3±0.5*

* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.

Max
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 18217.0 13741.9 18668.3 17248.0
20 KB 17155.4 12573.5 17665.8 16311.6
30 KB 16293.8 12006.0 17064.5 16028.3
50 KB 14349.8 10861.3 16093.1 14955.4
100 KB 9532.9 7986.6 11779.8 11687.1
200 KB 6287.4 5553.2 7239.6* 7239.4*

* - These runs seem to saturate network bandwith by looking at VM egress metrics graph.

Test Proxy - Comparison of various sizes at concurrency of 10

Executed on D4ds v5. Proxy on D32ds v5.

Avg
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 16514.0±82.8 12666.8±134.6 15313.6±174.3 14050.4±193.7
20 KB 15563.7±170.4 11932.7±196.8 14242.2±139.2 13767.6±146.4
30 KB 14856.7±229.6 11419.9±104.9 13890.6±205.7 13246.9±179.2
50 KB 13405.8±105.0 10240.0±72.6 13255.0±222.3 12691.9±116.8
100 KB 9032.7±73.0 7406.9±70.0 9763.1±96.7 9390.1±68.1
200 KB 5937.5±31.6 5141.2±19.5 7233.4±8.1 7083.8±60.0
Max
Size OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
10 KB 16663.5 13019.6 15721.1 14402.0
20 KB 15810.7 12299.2 14434.0 14082.5
30 KB 15087.7 11565.6 14203.2 13527.0
50 KB 13542.6 10380.6 13686.0 12858.7
100 KB 9203.0 7528.4 9961.5 9499.3
200 KB 5995.2 5188.3 7243.3 7182.5

Observations

  1. Results mostly align for low level concurrency, i.e. 1 and 2.
  2. The higher concurrency the larger gap in favor of sync stack with smaller payloads.
  3. True top-bottom sync stack (OkHttp) outperforms the rest at high concurrency with smaller payloads.
  4. The gain from sync stack diminishes as payload size grows. The workload becomes more I/O bound and jumping threads plays lesser role there.
  5. Pushing down sync-over-async (Netty, JDK) brings an improvement that's more visible with higher concurrency with smaller payloads.
  6. Netty seems to be better choice for larger payloads. The IO performance plays larger role there, Netty seems to be doing this better.
  7. OkHttp seems to be better choice for small payloads. The lightweightness and less of context switching favor OkHttp for these type of workloads.
  8. OkHttp grossly underperforms in sync-over-async runs on default settings (not recorded, but it was getting stuck below 2k tps even though concurrency was increased). We might want to tweak these defaults.
  9. OkHttp has a long (minute/minutes) shutdown time after last sync-over-async transaction. I.e. it prevents process shutdown until its default executor scales back to zero. We might want to tweak this default.

Profile

This section contains data from a profiling of a simple scenario that calls BlobServiceClient.getProperties() in a loop. See the code here.

JFR dumps has been obtained for each combination of http client and full-sync vs sync-over-async implementation. I.e. these commands has been run. The app was running against Azurite running on the same host.

C:\tools\openjdk-17_windows-x64_bin\jdk-17\bin\java -Xms500m -Xmx500m -D"run.mode=profile" -D"httpClientType=okhttp" -D"accountPropertiesCount=100000" -XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=okhttp-full-sync.jfr -jar azure-storage-perf-full-sync.jar
C:\tools\openjdk-17_windows-x64_bin\jdk-17\bin\java -Xms500m -Xmx500m -D"run.mode=profile" -D"httpClientType=netty" -D"accountPropertiesCount=100000" -XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=netty-full-sync.jfr -jar azure-storage-perf-full-sync.jar
C:\tools\openjdk-17_windows-x64_bin\jdk-17\bin\java -Xms500m -Xmx500m -D"run.mode=profile" -D"httpClientType=jdk" -D"accountPropertiesCount=100000" -XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=jdk-full-sync.jfr -jar azure-storage-perf-full-sync.jar
C:\tools\openjdk-17_windows-x64_bin\jdk-17\bin\java -Xms500m -Xmx500m -D"run.mode=profile" -D"httpClientType=okhttp" -D"accountPropertiesCount=100000" -XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=okhttp-sync-over-async.jfr -jar azure-storage-perf-sync-over-async.jar
C:\tools\openjdk-17_windows-x64_bin\jdk-17\bin\java -Xms500m -Xmx500m -D"run.mode=profile" -D"httpClientType=netty" -D"accountPropertiesCount=100000" -XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=netty-sync-over-async.jfr -jar azure-storage-perf-sync-over-async.jar
C:\tools\openjdk-17_windows-x64_bin\jdk-17\bin\java -Xms500m -Xmx500m -D"run.mode=profile" -D"httpClientType=jdk" -D"accountPropertiesCount=100000" -XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=jdk-sync-over-async.jfr -jar azure-storage-perf-sync-over-async.jar

Results

Metrics

Metric OkHttp
full sync
OkHttp
sync-over-async
Netty
full sync
Netty
sync-over-async
JDK
full sync
JDK
sync-over-async
Duration (s) 112 186 128 145 137 152
Total allocation (GB) 4.08 4.46 4.74 5.01 5.40 6.85
GC Total time (ms) 71.29 88.83 98.83 87.89 74.90 120.13
GC count 16 19 20 21 23 28
GC young count 16 18 19 20 22 27
GC old count 0 1 1 1 1 1

Threads

OkHttp full sync image

OkHttp sync-over-async image

Netty full sync image

Netty sync-over-async image

JDK full sync image

JDK sync-over-async image

Afterthoughts

Is it worth it?

Yes.

  1. Perf data indicates improvement.
  2. Profiler indicates lower memory pressure.
  3. Complexity decreases, (almost) no more reactor in hot path.
  4. There's less threads involved and less switching in full sync. Caller thread is anyway blocking, it could equally do some work.
  5. Less threads required for fully synchronous scenarios.

Lessons learnt from the bodge

  1. Abstracting Reactor in naive 1:1 attempt to build on top of its operators isn't sustainable. For some operators might be difficult to write sync equivalent (that's why SyncAsyncOperators were added where logic had to be completely forked). Perhaps identifying and implementing higher chunks of logic would be better (i.e. 1:N Reactor operators).
  2. Sync-async operators produce stack traces as ugly as Reactor alone. This won't solve debuggability problem well. It only has potential to solve code duplication.
  3. If pursued sync-async operators should become implementation detail that's used internally across SDKs. They don't seem to bring value to end user.
  4. It makes sense to not use sync-async operators below generated code. Since there's one core building imperative separate sync stack might be a compromise that has to be taken to make debuggability better.
  5. Above generated code usage of sync-async operators might make sense to reduce code duplication.
  6. Complex scenarios like buffered upload/download, storage streams, likely won't benefit from such operators, or these operators become complex. Which again brings problem of meaningless stacks.
  7. Inventing sync-async abstraction might reduce code duplication both above generated code and in tests (assuming that public surface calls directly into that abstraction).
  8. There are ways to smuggle sync behavior through reactor operators. I.e. by using things like Mono.just() or Mono.fromCallable() with some context switches (similarly like we as to eagerly buffer responses if payload is XML or JSON).

What about binary data?

The bodge uses Stream<ByteBuffer> to mimic Flux<ByteBuffer>. That simplifies plowing through the layers and involves less to achieve sync stack. However, this still involves layers of conversion if underlying HttpClient can speak streams. We should explore how to avoid these conversions - perhaps BinaryData with variety of content types would help here to defer or avoid conversion depending on consumption style.

Should we remove Reactor from internals entirely?

This could be possible if we base internals on CompletableFuture and reactor-over-future just below public API. But should reactive user work with fully reactive stack?

Options for HttpPipelinePolicy

Overview

This documents presents options for how to introduce sync API to HttpPipelinePolicy.

Current state

Currently the HttpPipelinePolicy is defined as

@FunctionalInterface
public interface HttpPipelinePolicy {

    Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next);
    
    default HttpPipelinePosition getPipelinePosition() {
        return HttpPipelinePosition.PER_RETRY;
    }
}

Desired state

If we were starting from scratch the HttpPipelinePolicy would look like this

public interface HttpPipelinePolicy {

    Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next);

    HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next);

    default HttpPipelinePosition getPipelinePosition() {
        return HttpPipelinePosition.PER_RETRY;
    }
}

With extra base class for before/after non-I/O usecases

public class HttpPipelineSynchronousPolicy implements HttpPipelinePolicy {

    @Override
    public final Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
        ...
    }

    @Override
    public final HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
        ...
    }

    protected void beforeSendingRequest(HttpPipelineCallContext context) {
        // empty by default
    }

    protected HttpResponse afterReceivedResponse(HttpPipelineCallContext context, HttpResponse response) {
        // empty by default
        return response;
    }

The obstacle

Given where we are today going straight to desired state is a breaking change, see here.

I.e. this can cause:

  • compilation error for users that implement the interface
  • NoSuchMethodError if somebody attempts to use HttpPipelinePolicy implementations without recompiling them, i.e. use latest Azure SDK with old jars that bring custom policies (like this one).

Options

Introduce new API as default implementation and attempt to make it work

In this case we introduce new API with default implementation that blocks on async version. We also add logic into HttpPipeline to "jump back" on sync API in the chain.

@FunctionalInterface
public interface HttpPipelinePolicy {
    ...
    default HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
        return process(context, next).block();
    }
    ...
}
public class HttpPipelineNextPolicy {
    ...
    public Mono<HttpResponse> process() {
        if (isSync && !Schedulers.isInNonBlockingThread()) {
            // Pipeline executes in synchronous style. We most likely got here via default implementation in the
            // HttpPipelinePolicy.processSynchronously so go back to sync style here.
            // Don't do this on non-blocking threads.
            return Mono.fromCallable(this::processSync);
        } else {
            // TODO either give up and sync-over-async here or farm this to Schedulers.boundedElastic().
        }
        ...
    }
    ...

Make a breaking change

In this case we introduce new API without default implementation and we make a breaking change.

public interface HttpPipelinePolicy {
    ...
    HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next);
    ...
}

Introduce new API as default implementation and throw

In this case we introduce new API with default implementation that throws at runtime.

@FunctionalInterface
public interface HttpPipelinePolicy {
    ...
    default HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
        throw new UnsupportedOperationException("Not implemented");
    }
    ...
}

Introduce HttpPipelinePolicyV2 and deprecate HttpPipelinePolicy and related APIs

In this case we deprecate existing HttpPipelinePolicy and all associated APIs. We create HttpPipelinePolicyV2 and overloads that take it. Internally, we use new types and adapt old type using techniques similar to "Introduce new API as default implementation and attempt to make it work" option.

@Deprecated
@FunctionalInterface
public interface HttpPipelinePolicy {
    ...
}
public interface HttpPipelinePolicyV2 {

    Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next);

    HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next);

    default HttpPipelinePosition getPipelinePosition() {
        return HttpPipelinePosition.PER_RETRY;
    }
}
public final class BlobClientBuilder {
    @Deprecated
    @Override
    public BlobClientBuilder addPolicy(HttpPipelinePolicy pipelinePolicy){
        ...
    }

    @Override
    public BlobClientBuilder addPolicy(HttpPipelinePolicyV2 pipelinePolicy){
        ...
    }
}

Introduce separate HttpSyncPipelinePolicy

From Srikanta:

  • We introduce a new top level policy HttpSyncPipelinePolicy similar to the current policy, it'll be a functional interface with single method process().
  • HttpPipelineBuilder will have another overload policies(HttpSyncPipelinePolicy... policies).
  • All builders will have an overload for addCustomPolicy(HttpSyncPipelinePolicy policy).

This will keep the sync and async pipeline separate. Also, this will allow the current async stack to remain unchanged. The decision to build the sync/async pipeline will happen at the time buildClient/ buildAsyncClient is called and if the pipeline policies are not homogenous, we throw here.

Pros:

  • allows customers to make a decision as to which interface they want to implement when creating a custom policy
  • the sync process() method is not hidden by default implementation in the interface
  • no mixing of sync and async policies in a pipeline (can be a con too)
  • current async pipeline will not have to change and none of the current policies will have to updated

Cons:

  • we'll have 2 classes for each policy - sync and async
  • additional method overload should be added to every builder to add custom sync policy
  • cannot have sync and async policies in the same pipeline
  • the policy names are going to use the termsync while the build methods use async to distinguish between sync and async

Pros and cons

Introduce new API as default implementation
and attempt to make it work
Make a breaking change Introduce new API as default implementation and throw Introduce HttpPipelinePolicyV2
and deprecate HttpPipelinePolicy and related APIs
Introduce separate HttpSyncPipelinePolicy
Doesn't break customer code compilation
Doesn't break at runtime
(possible NoSuchMethodError)

(if we want to switch sync APIs to use sync stack)

(if we want to switch sync APIs to use sync stack)
Allows us to flip sync client APIs to sync stack under the hood
(previusly configured policies stop working)
Signals customer that they have to update their code
Can enforce "sync stack" in HttpPipeline layer
(eventually, assuming that deprecation is noticed)
#!/bin/bash
export STORAGE_CONNECTION_STRING="REDACTED"
#testnames="downloadblobtofile"
#testnames="uploadblob"
#testnames="uploadblockblob"
testnames="uploadfromfile"
#httpclients="okhttp netty jdk"
#httpclients="okhttp"
#httpclients="netty"
httpclients="okhttp netty"
#httpclients="simple_netty"
parallels="1 10 30"
#parallels="10"
#parallels="30"
#syncasyncs="full-sync sync-over-async"
#syncasyncs="new-main-2 new-main old-main"
#syncasyncs="full-sync"
#syncasyncs="sync-over-async"
#syncasyncs="simple-netty"
syncasyncs="file-upload-new file-upload-old"
#sizes="10240 20480 30720"
#sizes="20480 30720"
#sizes="20480"
#sizes="1048576 10485760"
#sizes="10240 1048576"
#sizes="10240 51200 102400 512000 1048576 10485760"
sizes="10240 1048576 10485760 524288000"
#heap="1g"
heap="300m"
sync=""
#sync="--sync"
#warmup=120
warmup=30
#duration=120
duration=60
for counter in {1..15}
do
for testname in $testnames
do
for size in $sizes
do
for parallel in $parallels
do
for httpclient in $httpclients
do
for syncasync in $syncasyncs
do
echo "Starting $testname $httpclient $syncasync parallel=$parallel size=$size $counter"
set -x
java -XX:+ExitOnOutOfMemoryError -Xms$heap -Xmx$heap -jar azure-storage-perf-$syncasync.jar $testname $sync --parallel $parallel --warmup $warmup --duration $duration --size $size --http-client $httpclient
set +x
echo "Finished $testname $httpclient $syncasync parallel=$parallel size=$size $counter"
sleep 1
done
done
done
done
done
done
#!/bin/bash
endpoint="REDACTED"
testnames="pipelinesend"
#httpclients="okhttp netty jdk"
#httpclients="okhttp"
#httpclients="netty"
httpclients="okhttp netty"
parallels="1 10 30"
#parallels="30"
#parallels="30"
syncasyncs="full-sync sync-over-async"
#syncasyncs="full-sync"
#syncasyncs="sync-over-async"
#sizes="10240 20480 30720 51200 102400 204800"
sizes="10240"
binarydatasources="bytes file stream"
#binarydatasources="bytes"
#warmup=120
warmup=120
#duration=120
duration=120
for testname in $testnames
do
for binarydatasource in $binarydatasources
do
for parallel in $parallels
do
for httpclient in $httpclients
do
for size in $sizes
do
for syncasync in $syncasyncs
do
for counter in {1..15}
do
echo "Starting $testname $binarydatasource $httpclient $syncasync parallel=$parallel size=$size $counter"
set -x
java -Xms2g -Xmx2g -jar azure-core-perf-$syncasync.jar $testname --sync --parallel $parallel --warmup $warmup --duration $duration --size $size --http-client $httpclient --include-pipeline-policies --backend-type blobs --endpoint "$endpoint" --binary-data-source $binarydatasource
set +x
echo "Finished $testname $binarydatasource $httpclient $syncasync parallel=$parallel size=$size $counter"
sleep 1
done
done
done
done
done
done
done

Second Prototype

Overview

This document presents second attempt to build a synchronous stack and focuses on HttpPipeline and HttpClient layers. The synchronous RestProxy is covered in Vinay's documment.

This project has evolved beyond "sync stack" since inception. Therefore, plenty of improvement discovered here are going to be applicable to async clients as well.

Ingredients

Full details can be found in the experimental branch.

Rest Proxy

Rest Proxy is discussed separately here.

BinaryData

BinaryData gets accessors availabe in the core packages that allow access to the BinaryDataContent.

package com.azure.core.implementation.util;

public final class BinaryDataHelper {
    private static BinaryDataAccessor accessor;

    public interface BinaryDataAccessor {

        BinaryData createBinaryData(BinaryDataContent content);

        BinaryDataContent getContent(BinaryData binaryData);
    }
}

BinaryData should also get API to be able to have lazy FluxContent. Currently BinaryData.fromFlux eagerly reads payload due to historical reasons.

HttpRequest

HttpRequest now accepts BinaryData as content representation. It keeps the payload in BinaryData form internally while making sure existing APIs work. This is to defers format conversions until absolutely necessary. This also allows to apply optimizations in places where content has to be accessed.

      public class HttpRequest {
          public HttpRequest(HttpMethod httpMethod, URL url) 
          public HttpRequest(HttpMethod httpMethod, String url) 
          public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, Flux<ByteBuffer> body) 
+         public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, BinaryData data) 
          public Flux<ByteBuffer> getBody() 
+         public BinaryData getBodyAsBinaryData() 
          public HttpRequest setBody(String content) 
          public HttpRequest setBody(byte[] content) 
          public HttpRequest setBody(Flux<ByteBuffer> content) 
+         public HttpRequest setBody(BinaryData data) 
          public HttpRequest copy() 
          public HttpRequest setHeader(String name, String value) 
          public HttpHeaders getHeaders() 
          public HttpRequest setHeaders(HttpHeaders headers) 
          public HttpMethod getHttpMethod() 
          public HttpRequest setHttpMethod(HttpMethod httpMethod) 
          public URL getUrl() 
          public HttpRequest setUrl(URL url) 
          public HttpRequest setUrl(String url) 
     }

HttpResponse

HttpResponse can now return payload in a form of BinaryData. This is to let HttpClient serve content in a form that's most apriopriate for the context. E.g. serve Flux for async calls, serve InputStream for sync calls or byte[] if eager buffering of response has been requested. These optimizations are best effort depending on the HttpClient implementation.

Additionally HttpResponse gets a set of writeBodyTo methods that cover types like OutputStream or Channel and can write bytes to destination in optimized way (i.e. use much less memory).

      public abstract class HttpResponse implements Closeable {
          protected HttpResponse(HttpRequest request) 
          public abstract Flux<ByteBuffer> getBody() 
          public abstract Mono<byte[]> getBodyAsByteArray() 
          public Mono<InputStream> getBodyAsInputStream() 
          public abstract Mono<String> getBodyAsString() 
          public abstract Mono<String> getBodyAsString(Charset charset) 
+         public BinaryData getBodyAsBinaryData() 
          public HttpResponse buffer() 
          @Override public void close()
          public abstract HttpHeaders getHeaders() 
          public abstract String getHeaderValue(String name) 
          public final HttpRequest getRequest() 
          public abstract int getStatusCode() 
+         public void writeBodyTo(OutputStream outputStream) throws IOException
+         public void writeBodyTo(WritableByteChannel channel) throws IOException
+         public void writeBodyTo(FileChannel channel, long position) throws IOException
+         public Mono<Void> writeBodyTo(AsynchronousFileChannel asynchronousFileChannel, long position)
      }

HttpPipeline

The HttpPipeline gets extra set of synchronous APIs that mirror reactive counterpart. We probably need just one of those.

      public final class HttpPipeline {
          // This class does not have any public constructors, and is not able to be instantiated using 'new'.
          public HttpClient getHttpClient() 
          public HttpPipelinePolicy getPolicy(int index) 
          public int getPolicyCount() 
          public Mono<HttpResponse> send(HttpRequest request) 
          public Mono<HttpResponse> send(HttpPipelineCallContext context) 
          public Mono<HttpResponse> send(HttpRequest request, Context data) 
+         public HttpResponse sendSync(HttpRequest request, Context data) 
      }

HttpPipelinePolicy

The HttpPipelinePolicy got an extra synchronous API with default implementation that blocks on async API.

      @FunctionalInterface
      public interface HttpPipelinePolicy {
          default HttpPipelinePosition getPipelinePosition() 
          Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) 
+         default HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next) 
+     }

HttpPipelineNextPolicy

The HttpPipelineNextPolicy got new synchronous API.

      public class HttpPipelineNextPolicy {
          // This class does not have any public constructors, and is not able to be instantiated using 'new'.
          @Override public HttpPipelineNextPolicy clone() 
          public Mono<HttpResponse> process() 
+         public HttpResponse processSync() 
      }

In addition to that HttpPipelineNextPolicy keeps track of whether HttpPipeline got called synchronously or not and tries to get it back on right track.

public class HttpPipelineNextPolicy {
    ...
    private final boolean isSynchronous;
    ...
    public Mono<HttpResponse> process() {
        if (isSynchronous && !Schedulers.isInNonBlockingThread()) {
            // Pipeline executes in synchronous style. We most likely got here via default implementation in the
            // HttpPipelinePolicy.processSync so go back to sync style here.
            // Don't do this on non-blocking threads.
            return Mono.fromCallable(this::processSync);
        } else {
            if (isSynchronous) {
                LOGGER.warning("The pipeline switched from synchronous to asynchronous."
                    + " Check if all policies override HttpPipelinePolicy.processSynchronously");
            }
            ...
        }
    }

    public HttpResponse processSync() {
        if (!isSynchronous) {
            throw LOGGER.logExceptionAsError(new IllegalStateException(
                "Must not use HttpPipelineNextPolicy.processSynchronously in asynchronous HttpPipeline invocation."));
        }
        ...
    }
    ...
}

HttpPipelineSynchronousPolicy

A new abstract class HttpPipelineSynchronousPolicy has been introduced to cover cases where pre and post request actions are purely sychronous. This is to reduce code duplication invoved in writing simple policies like this.

public class HttpPipelineSynchronousPolicy implements HttpPipelinePolicy {
    public HttpPipelineSynchronousPolicy()
    protected HttpResponse afterReceivedResponse(HttpPipelineCallContext context, HttpResponse response) 
    protected void beforeSendingRequest(HttpPipelineCallContext context) 
    @Override public final Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) 
    @Override public final HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextPolicy next) 
}

Example usage:

public class AddDatePolicy extends HttpPipelineSynchronousPolicy {
    private final DateTimeFormatter format = DateTimeFormatter
            .ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'")
            .withZone(ZoneId.of("UTC"))
            .withLocale(Locale.US);

    @Override
    protected void beforeSendingRequest(HttpPipelineCallContext context) {
        context.getHttpRequest().getHeaders().set("Date", format.format(OffsetDateTime.now()));
    }
}

HttpClient

The HttpClient interface got a new synchronous method with default implementation that blocks on async call.

      public interface HttpClient {
          static HttpClient createDefault() 
          static HttpClient createDefault(HttpClientOptions clientOptions) 
          Mono<HttpResponse> send(HttpRequest request) 
          default Mono<HttpResponse> send(HttpRequest request, Context context) 
+         default HttpResponse sendSync(HttpRequest request, Context context) 
      }

The implementations of HttpClient then choose what to do and what's best way of handling request content.

There's a little bit of controversy around Async in the HttpClient implementations and their builders. However a design where we'd have separate HttpClient and SynchronousHttpClient with respective implementation could potentially create a configuration nightmare, i.e. what should happen if somebody calls new FooClientBuilder.buildFooClient() but provided only async client. Instead of this we'd rather create new types and deprecate existing with Async or just leave it a is assuming that customers would rather use HttpClientOptions over playing with HttpClient builders.

class OkHttpAsyncHttpClient implements HttpClient {
    ...
    @Override
    public HttpResponse sendSync(HttpRequest request, Context context) {
        boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);

        Request okHttpRequest = toOkHttpRequestSynchronously(request);
        Call call = httpClient.newCall(okHttpRequest);
        try {
            Response okHttpResponse = call.execute();
            return fromOkHttpResponse(okHttpResponse, request, eagerlyReadResponse);
        } catch (IOException e) {
            throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
        }
    }
    ...
    private static RequestBody toOkHttpRequestBodySynchronously(BinaryData bodyContent, HttpHeaders headers) {
        String contentType = headers.getValue("Content-Type");
        MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);


        if (bodyContent == null) {
            return RequestBody.create(ByteString.EMPTY, mediaType);
        }

        BinaryDataContent content = BinaryDataHelper.getContent(bodyContent);
        if (content instanceof ByteArrayContent) {
            return RequestBody.create(content.toBytes(), mediaType);
        } else if (content instanceof FileContent) {
            FileContent fileContent = (FileContent) content;
            // This won't be right all the time as we may be sending only a partial view of the file.
            // TODO (alzimmer): support ranges in FileContent
            return RequestBody.create(fileContent.getFile().toFile(), mediaType);
        } else if (content instanceof StringContent) {
            return RequestBody.create(bodyContent.toString(), mediaType);
        } else if (content instanceof InputStreamContent) {
            Long contentLength = content.getLength();
            if (contentLength == null) {
                String contentLengthHeaderValue = headers.getValue("Content-Length");
                if (contentLengthHeaderValue != null) {
                    contentLength = Long.parseLong(contentLengthHeaderValue);
                } else {
                    contentLength = -1L;
                }
            }
            long effectiveContentLength = contentLength;
            return new RequestBody() {
                @Override
                public MediaType contentType() {
                    return mediaType;
                }

                @Override
                public long contentLength() {
                    return effectiveContentLength;
                }

                @Override
                public void writeTo(BufferedSink bufferedSink) throws IOException {
                    // TODO (kasobol-msft) OkHttp client can retry internally so we should add mark/reset here
                    // and fallback to buffering if that's not supported.
                    // We should also consider adding InputStreamSupplierBinaryDataContent type where customer can
                    // give a prescription how to acquire/re-acquire an InputStream.
                    Source source = Okio.source(content.toStream());
                    bufferedSink.writeAll(source);
                }
            };
        } else {
            // TODO (kasobol-msft) is there better way than just block? perhaps throw?
            // Perhaps we could consider using one of storage's stream implementation on top of flux?
            // Or maybe implement that OkHttp sink and get rid off reading to string altogether.
            return toByteString(bodyContent.toFluxByteBuffer()).map(bs -> RequestBody.create(bs, mediaType)).block();
        }
    }
    ...
}
class NettyAsyncHttpClient implements HttpClient {
    ...
    // This implementation doesn't override default implementation as it wouldn't do anything smart about it anyway.
    ...
    private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bodySendDelegate(
        final HttpRequest restRequest) {
            ...
            BinaryDataContent binaryDataContent = BinaryDataHelper.getContent(restRequest.getContent());
            if (binaryDataContent instanceof ByteArrayContent) {
                return reactorNettyOutbound.send(Mono.just(Unpooled.wrappedBuffer(binaryDataContent.toBytes())));
            } else if (binaryDataContent instanceof StringContent) {
                return reactorNettyOutbound.send(Mono.fromSupplier(
                    () -> Unpooled.wrappedBuffer(binaryDataContent.toBytes())));
            } else if (binaryDataContent instanceof FileContent) {
                FileContent fileContent = (FileContent) binaryDataContent;
                // fileContent.getLength() is always not null in FileContent.
                if (restRequest.getUrl().getProtocol().equals("https")) {
                    // NettyOutbound uses such logic internally for ssl connections but with smaller buffer of 1KB.
                    return reactorNettyOutbound.sendUsing(
                        () -> FileChannel.open(fileContent.getFile(), StandardOpenOption.READ),
                        (c, fc) -> {
                            if (c.channel().pipeline().get(ChunkedWriteHandler.class) == null) {
                                c.addHandlerLast("reactor.left.chunkedWriter", new ChunkedWriteHandler());
                            }

                            try {
                                return new ChunkedNioFile(
                                    fc, fileContent.getPosition(), fileContent.getLength(), fileContent.getChunkSize());
                            } catch (IOException e) {
                                throw Exceptions.propagate(e);
                            }
                        },
                        (fc) -> {
                            try {
                                fc.close();
                            } catch (IOException e) {
                                LOGGER.log(LogLevel.VERBOSE, () -> "Could not close file", e);
                            }
                        });
                } else {
                    // Beware of NettyOutbound.sendFile(Path) it involves extra file length lookup.
                    // This is going to use zero-copy transfer if there's no ssl
                    return reactorNettyOutbound.sendFile(
                        fileContent.getFile(), fileContent.getPosition(), fileContent.getLength());
                }
            } else if (binaryDataContent instanceof StringContent) {
                return reactorNettyOutbound.sendString(Mono.just(binaryDataContent.toString()));
            } else if (binaryDataContent instanceof InputStreamContent) {
                return reactorNettyOutbound.sendUsing(
                    binaryDataContent::toStream,
                    (c, stream) -> {
                        if (c.channel().pipeline().get(ChunkedWriteHandler.class) == null) {
                            c.addHandlerLast("reactor.left.chunkedWriter", new ChunkedWriteHandler());
                        }

                        return new ChunkedStream(stream);
                    },
                    (stream) -> {
                        try {
                            stream.close();
                        } catch (IOException e) {
                            LOGGER.log(LogLevel.VERBOSE, () -> "Could not close stream", e);
                        }
                    });
            } else {
                Flux<ByteBuf> nettyByteBufFlux = restRequest.getBody().map(Unpooled::wrappedBuffer);
                return reactorNettyOutbound.send(nettyByteBufFlux);
            }
            ...
    }
    ...
}
public final class NettyAsyncHttpResponse extends NettyAsyncHttpResponseBase {
    ...
    @Override
    public void writeBodyTo(OutputStream outputStream) throws IOException {
        // TODO (kasobol-msft) handle other cases optimizations from ImplUtils.writeByteBufferToStream.
        // However it seems that buffers here don't have backing arrays. And for files we should probably have
        // writeTo(Channel) API.
        byte[] buffer = new byte[8 * 1024];
        bodyIntern().flatMap(byteBuff -> {
            while (byteBuff.isReadable()) {
                try {
                    // TODO (kasobol-msft) this could be optimized further,i.e. make sure we're utilizing
                    // whole buffer before passing to outputstream.
                    int numberOfBytes = Math.min(buffer.length, byteBuff.readableBytes());
                    byteBuff.readBytes(buffer, 0, numberOfBytes);
                    // TODO (kasobol-msft) consider farming this out to Schedulers.boundedElastic.
                    // https://github.com/reactor/reactor-netty/issues/2096#issuecomment-1068832894
                    outputStream.write(buffer, 0, numberOfBytes);
                } catch (IOException e) {
                    return Mono.error(e);
                }
            }
            return Mono.empty();
        }).blockLast();
    }
    ...
}
public final class OkHttpAsyncResponse extends OkHttpAsyncResponseBase {
    ...
    @Override
    public void writeBodyTo(OutputStream outputStream) throws IOException {
        StreamUtils.INSTANCE.transfer(responseBody.byteStream(), outputStream);
    }
    ...
}

Test Aids

Branching of sync stack means that sync and async client are going to demand equal test coverage. In order to facilitate this we should be building test extensions like one presented below.

    @SyncAsyncTest
    public void noRedirectPolicyTest() throws Exception {
        final HttpPipeline pipeline = new HttpPipelineBuilder()
            .httpClient(new NoOpHttpClient() {

                @Override
                public Mono<HttpResponse> send(HttpRequest request) {
                    if (request.getUrl().toString().equals("http://localhost/")) {
                        Map<String, String> headers = new HashMap<>();
                        headers.put("Location", "http://redirecthost/");
                        HttpHeaders httpHeader = new HttpHeaders(headers);
                        return Mono.just(new MockHttpResponse(request, 308, httpHeader));
                    } else {
                        return Mono.just(new MockHttpResponse(request, 200));
                    }
                }
            })
            .build();

        HttpResponse response = SyncAsyncExtension.execute(
            () -> pipeline.send(new HttpRequest(HttpMethod.GET,
                new URL("http://localhost/"))).block(),
            () -> pipeline.sendSync(new HttpRequest(HttpMethod.GET,
                new URL("http://localhost/")))
        );

        assertEquals(308, response.getStatusCode());
    }

Stack traces

Simmulated DNS failure in perf runs. I.e. --endpoint "https://foo"

Netty

New

java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
        at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:241)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
        at com.azure.core.perf.App.main(App.java:20)
Caused by: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at reactor.core.Exceptions.propagate(Exceptions.java:392)
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
        at reactor.core.publisher.Mono.block(Mono.java:1707)
        at com.azure.core.http.HttpClient.sendSync(HttpClient.java:42)
        at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
        at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
        at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:139)
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:112)
        at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:54)
        at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
        at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:117)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$16(PerfStressProgram.java:240)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
        at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
        at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$17(PerfStressProgram.java:240)
        at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
                ... 32 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
        at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
        at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
        at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
        at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169)
        at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166)
        at io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50)
        at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
        at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71)
        at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
        at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
        at io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
        at reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:283)
        at reactor.netty.transport.TransportConnector.lambda$connect$6(TransportConnector.java:110)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
        at reactor.netty.transport.TransportConnector$MonoChannelPromise._subscribe(TransportConnector.java:579)
        at reactor.netty.transport.TransportConnector$MonoChannelPromise.lambda$subscribe$0(TransportConnector.java:499)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

Old

java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
        at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:237)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
        at com.azure.core.perf.App.main(App.java:20)
Caused by: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at reactor.core.Exceptions.propagate(Exceptions.java:392)
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
        at reactor.core.publisher.Mono.block(Mono.java:1707)
        at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:46)
        at com.azure.perf.test.core.PerfStressProgram.runLoop(PerfStressProgram.java:285)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$14(PerfStressProgram.java:236)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
        at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
        at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$15(PerfStressProgram.java:236)
        at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
                ... 25 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
        at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
        at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
        at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
        at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:169)
        at io.netty.util.internal.SocketUtils$9.run(SocketUtils.java:166)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at io.netty.util.internal.SocketUtils.allAddressesByName(SocketUtils.java:166)
        at io.netty.resolver.DefaultNameResolver.doResolveAll(DefaultNameResolver.java:50)
        at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
        at io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:71)
        at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
        at io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
        at io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
        at reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:283)
        at reactor.netty.transport.TransportConnector.lambda$connect$6(TransportConnector.java:110)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
        at reactor.netty.transport.TransportConnector$MonoChannelPromise._subscribe(TransportConnector.java:579)
        at reactor.netty.transport.TransportConnector$MonoChannelPromise.lambda$subscribe$0(TransportConnector.java:499)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

OkHttp

New

java.util.concurrent.ExecutionException: java.io.UncheckedIOException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
        at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:241)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
        at com.azure.core.perf.App.main(App.java:20)
Caused by: java.io.UncheckedIOException: java.net.UnknownHostException: No such host is known (foo)
        at com.azure.core.http.okhttp.OkHttpAsyncHttpClient.sendSync(OkHttpAsyncHttpClient.java:102)
        at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
        at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
        at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:139)
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:112)
        at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:54)
        at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
        at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:117)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$16(PerfStressProgram.java:240)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
        at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
        at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$17(PerfStressProgram.java:240)
        at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
        at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
        at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
        at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
        at okhttp3.Dns$Companion$DnsSystem.lookup(Dns.kt:49)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.kt:164)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.kt:129)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.kt:71)
        at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.kt:205)
        at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.kt:106)
        at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.kt:74)
        at okhttp3.internal.connection.RealCall.initExchange$okhttp(RealCall.kt:255)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:32)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
        at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
        at com.azure.core.http.okhttp.OkHttpAsyncHttpClient.sendSync(OkHttpAsyncHttpClient.java:99)
        ... 30 more

Old

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:259)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
        at com.azure.core.perf.App.main(App.java:20)
Caused by: java.util.concurrent.ExecutionException: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
        at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:237)
        ... 3 more
Caused by: reactor.core.Exceptions$ReactiveException: java.net.UnknownHostException: No such host is known (foo)
        at reactor.core.Exceptions.propagate(Exceptions.java:392)
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
        at reactor.core.publisher.Mono.block(Mono.java:1707)
        at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:46)
        at com.azure.perf.test.core.PerfStressProgram.runLoop(PerfStressProgram.java:285)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$14(PerfStressProgram.java:236)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
        at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
        at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$15(PerfStressProgram.java:236)
        at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
                ... 25 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
        at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
        at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
        at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
        at okhttp3.Dns$Companion$DnsSystem.lookup(Dns.kt:49)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.kt:164)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.kt:129)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.kt:71)
        at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.kt:205)
        at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.kt:106)
        at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.kt:74)
        at okhttp3.internal.connection.RealCall.initExchange$okhttp(RealCall.kt:255)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:32)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
        at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
        at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:517)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

Simple Netty

java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1006)
        at com.azure.perf.test.core.PerfStressProgram.runTests(PerfStressProgram.java:241)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:163)
        at com.azure.perf.test.core.PerfStressProgram.run(PerfStressProgram.java:91)
        at com.azure.core.perf.App.main(App.java:20)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:600)
        ... 5 more
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
        at com.azure.core.http.netty.SimpleNettyHttpClient.sendSync(SimpleNettyHttpClient.java:76)
        at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:78)
        at com.azure.core.http.policy.HttpPipelineSynchronousPolicy.processSynchronously(HttpPipelineSynchronousPolicy.java:35)
        at com.azure.core.http.HttpPipelineNextPolicy.processSynchronously(HttpPipelineNextPolicy.java:81)
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:139)
        at com.azure.core.http.HttpPipeline.sendSync(HttpPipeline.java:112)
        at com.azure.core.perf.PipelineSendTest.run(PipelineSendTest.java:54)
        at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
        at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:117)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$16(PerfStressProgram.java:240)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:204)
        at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
        at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:408)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:188)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.IntPipeline.forEach(IntPipeline.java:439)
        at java.base/java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:596)
        at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$17(PerfStressProgram.java:240)
        at java.base/java.util.concurrent.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1407)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: java.util.concurrent.ExecutionException: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
        at com.azure.core.http.netty.SimpleNettyHttpClient.sendSync(SimpleNettyHttpClient.java:74)
        ... 30 more
Caused by: java.net.UnknownHostException: No such host is known (foo)
        at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:928)
        at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1514)
        at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:847)
        at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1504)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1363)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1297)
        at java.base/java.net.InetAddress.getByName(InetAddress.java:1247)
        at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
        at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
        at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
        at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
        at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
        at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
        at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
        at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
        at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
        at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
        at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
        at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
        at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:834)
0               0               NaN

Benchmarks

This section contains results of perf runs where PipelineSendTest from main and from feature/sync-stack branch has been run with various input types. The version from main branch represents sync-over-async, the version from feature/sync-stack branch represents sync stack. Additional runs with simple netty prototype has been perfromed.

Tests were run on D4ds v5 VM against Premium Blobs sitting in the same region. The procedure can be found in run.sh and stats.py scripts attached to this gist.

How to read the data

I had to replace storage account and re-run benchmarks for netty while iterating on it. So please:

  • Look at how new vs old compares for given http client type
  • Don't try to compare Netty vs OkHttp using this data
  • The Simple Netty has not been deeply explored as early runs revealed it's yet far from being ready to handle higher concurrency and payload.

Pipeline - Upload byte array

10KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 228.09±3.82 225.37±3.03 222.50±3.10 218.16±4.57 212.45±3.28
10 2317.76±51.06 2328.51±19.09 2293.36±32.83 2352.69±38.57 2205.81±10.51
30 6793.07±67.15 6763.78±53.99 6830.02±128.09 6191.75±442.77 6583.52±54.77

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 232.30 229.24 228.42 228.72 217.97
10 2411.39 2364.73 2336.98 2424.97 2225.90
30 6906.07 6858.52 7018.93 6754.49 6661.29

20KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 211.82±3.00 215.43±2.16 221.09±5.56 231.09±2.95 204.80±2.08
10 2257.15±18.39 2217.50±58.82 2363.11±39.99 2401.59±29.81 2065.92±43.76
30 6462.38±69.80 6293.62±91.62 6873.53±87.58 7092.81±70.57 6352.10±58.58

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 217.13 219.54 228.43 236.38 208.89
10 2278.97 2281.07 2435.68 2443.56 2139.64
30 6536.53 6397.16 6996.67 7252.83 6437.81

30KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 207.03±3.17 206.77±4.55 220.93±2.22 225.30±2.56 200.03±4.03
10 2119.89±29.20 1962.91±32.21 2233.86±23.46 2370.91±20.25 1492.26±91.09
30 6125.67±206.45 6364.71±62.21 6699.11±99.18 6932.81±45.38 5141.70±360.60

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 210.66 214.50 225.73 228.52 206.45
10 2155.79 2026.27 2270.99 2407.24 1575.42
30 6383.51 6464.03 6826.51 6990.66 5697.99

1MB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 43.20±0.69 42.06±0.83 41.82±0.56 42.84±1.55
10 437.41±7.61 427.41±6.39 437.29±5.58 453.33±4.54
30 1326.22±10.07 1220.85±10.69 1299.35±22.48 1357.68±28.30

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 44.70 43.46 42.49 45.09
10 448.48 438.49 447.11 462.66
30 1346.72 1246.38 1325.09 1391.15

10MB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 12.12±0.28 11.53±0.14 11.80±0.09 11.87±0.08
10 120.04±1.24 107.61±1.28 114.99±1.86 116.81±1.84
30 142.81±0.04 120.44±0.64 143.04±0.06 143.01±0.01

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 12.56 11.79 11.92 11.99
10 121.64 109.74 117.50 119.61
30 142.92 121.43 143.15 143.03

Pipeline - Upload from file

10KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 223.95±3.94 213.39±4.53 220.64±4.81 218.28±4.62 212.65±4.20
10 2350.04±18.26 2298.17±14.76 2411.67±27.93 2249.97±32.39 2142.36±52.36
30 6747.74±169.40 6202.92±42.33 6827.73±68.30 6625.91±48.80 6702.13±40.87

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 231.77 219.90 227.74 226.04 218.66
10 2396.38 2323.75 2444.11 2311.16 2202.69
30 6946.93 6252.13 6947.54 6699.03 6789.87

20KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 214.21±2.96 210.32±3.26 206.66±2.46 210.63±3.11 206.98±2.97
10 2274.02±18.96 2218.15±35.37 2320.26±19.61 2245.78±18.93 2117.79±21.32
30 6294.20±210.31 6258.29±55.94 6754.98±107.02 6404.29±124.94 6333.64±112.42

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 222.08 214.80 211.44 216.34 213.15
10 2307.67 2274.78 2355.74 2276.77 2153.00
30 6642.96 6334.55 6949.71 6656.56 6513.92

30KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 212.36±3.05 205.35±4.29 215.53±6.90 205.59±3.02 211.85±2.50
10 2092.59±45.83 2040.19±43.83 2241.21±34.35 2214.69±36.29 1502.88±68.28
30 6404.09±84.52 6167.87±61.41 6583.26±85.42 6433.37±61.16 5119.79±29.88

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 218.35 212.55 226.61 211.42 217.09
10 2145.27 2106.43 2308.51 2273.51 1607.09
30 6494.87 6233.74 6686.30 6497.07 5163.48

1MB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 51.98±0.98 46.87±0.92 50.58±0.88 50.86±0.86
10 528.79±7.95 439.15±3.81 511.54±6.50 458.72±5.19
30 1421.51±5.94 OOM 1395.93±23.02 504.52±6.89

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 53.43 47.90 51.61 52.07
10 540.16 444.91 518.55 468.80
30 1426.61 OOM 1425.34 514.34

10MB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 16.38±0.20 13.40±0.13 16.40±0.30 15.98±0.14
10 142.76±0.03 18.13±1.09 142.73±0.01 52.84±0.33
30 142.84±0.05 OOM 142.75±0.05 53.29±0.23

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 16.59 13.54 16.90 16.19
10 142.85 18.90 142.75 53.19
30 142.94 OOM 142.88 53.65

Pipeline - Upload from InputStream

10KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 213.43±12.88 206.78±6.93 229.60±4.98 228.44±3.18 208.54±4.06
10 2198.63±83.95 2203.22±49.05 2373.41±51.72 2405.36±22.22 2201.72±19.86
30 6507.52±216.94 6419.10±153.63 7040.99±106.53 7162.36±133.09 6600.42±125.62

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 228.81 217.76 235.19 231.88 214.31
10 2311.36 2272.92 2452.87 2436.90 2226.08
30 6862.53 6657.20 7221.24 7334.82 6786.06

20KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 208.51±9.51 209.98±6.14 224.65±6.38 229.49±4.62 211.62±2.32
10 2197.06±55.73 2156.12±66.47 2372.08±54.85 2411.11±16.51 2086.71±36.00
30 6312.03±163.45 6265.88±180.71 6866.54±184.08 6923.11±40.12 6333.45±41.65

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 220.03 222.20 232.11 238.25 214.84
10 2295.41 2275.85 2429.50 2449.42 2138.61
30 6561.12 6600.87 7178.04 6984.13 6384.04

30KB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 211.40±7.30 205.73±6.26 216.86±4.99 226.42±7.29 141.33±79.15
10 2149.53±59.13 2092.08±54.41 2401.58±17.56 2405.72±11.77 1504.34±77.66
30 6273.76±111.70 6157.07±92.96 6727.61±199.28 6373.85±93.03 5688.02±63.68

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 223.60 217.07 226.05 240.78 208.28
10 2246.87 2190.64 2424.31 2421.89 1625.92
30 6459.22 6326.71 7003.69 6516.26 5815.88

1MB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 49.01±1.70 46.00±0.90 50.42±0.89 51.06±1.12
10 488.54±8.50 466.18±5.85 515.79±4.67 523.06±5.95
30 1422.14±9.00 1264.74±14.33 1425.32±2.01 1426.93±3.79

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 52.51 47.56 52.11 52.90
10 499.92 476.06 524.21 534.80
30 1427.48 1286.76 1426.32 1429.47

10MB

Avg throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 14.97±0.33 13.38±0.43 16.67±0.26 16.56±0.25
10 142.68±0.10 117.86±1.95 142.73±0.01 142.87±0.02
30 142.77±0.01 117.70±1.46 142.76±0.02 142.85±0.02

Max throughput (ops/s)

Concurrency OkHttp
new
OkHttp
old
Netty
new
Netty
old
Simple Netty
1 15.44 14.17 17.07 16.76
10 142.74 120.76 142.75 142.91
30 142.79 119.93 142.78 142.87

Storage - Download - OutputStream

BlobClient.download(OutputStream stream)

10KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 469.56±15.07 438.03±19.95 445.74±8.69 417.44±13.39
10 4584.91±60.28 4257.45±43.58 4350.17±70.69 4090.19±42.84
30 13348.24±154.31 10453.08±101.88 12279.32±82.65 10540.85±125.06

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 502.78 473.13 461.43 450.56
10 4719.62 4312.81 4474.37 4157.29
30 13590.36 10589.60 12401.56 10703.58

20KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 449.04±14.74 420.60±14.54 432.71±24.21 400.10±19.09
10 4412.23±43.30 4110.69±48.88 4160.19±53.83 3946.77±63.24
30 12726.46±160.62 9378.05±131.00 11584.03±87.48 9877.09±89.70

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 471.53 457.25 474.43 440.54
10 4491.06 4203.64 4238.92 4055.79
30 13013.68 9640.14 11734.63 10011.26

30KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 440.17±13.31 407.20±13.10 411.63±14.31 391.53±11.98
10 4315.55±84.66 3987.70±60.79 3992.01±58.71 3780.92±37.07
30 12197.80±152.72 8645.24±96.25 11021.50±141.60 9275.52±53.97

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 462.15 438.62 437.59 413.90
10 4454.84 4083.45 4058.69 3826.43
30 12408.60 8800.95 11292.93 9401.83

1MB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 145.21±3.84 122.12±7.19 135.85±3.26 121.38±2.75
10 1210.97±29.98 966.41±10.20 1109.48±15.40 957.30±7.44
30 1425.91±25.08 1155.34±8.51 1503.51±16.28 1333.04±8.06

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 152.24 138.38 143.14 126.90
10 1275.81 991.07 1132.58 968.83
30 1480.66 1176.24 1543.86 1343.49

10MB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 23.86±0.91 20.70±0.73 21.65±0.74 21.92±0.92
10 151.35±4.63 123.47±0.67 154.21±4.24 135.90±2.44
30 154.35±1.59 125.28±0.88 170.27±1.29 151.37±0.47

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 25.70 21.74 22.83 23.32
10 162.57 124.88 162.37 140.95
30 156.61 127.32 172.23 152.15

Storage - Download - File (AsynchronousFileChannel)

BlobClient.downloadToFile(String filePath)

10KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 372.89±16.77 330.76±9.64 349.68±15.30 327.35±10.44
10 3546.62±95.06 2556.30±58.30 3339.41±87.91 2923.97±61.21
30 5218.04±88.48 3146.28±51.33 5228.46±70.22 4163.25±22.00

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 395.39 347.16 373.64 344.24
10 3677.71 2621.75 3482.24 3055.24
30 5317.11 3217.97 5345.28 4202.16

20KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 360.17±7.81 309.72±11.48 338.68±7.17 314.59±11.64
10 3350.94±83.49 2266.76±40.89 3142.80±53.74 2720.84±51.16
30 4665.73±21.79 2708.20±20.77 4688.23±87.03 3816.91±48.84

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 368.52 328.40 350.00 333.61
10 3480.82 2340.20 3198.75 2811.38
30 4699.20 2735.80 4850.35 3899.81

30KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 333.52±11.41 301.06±10.36 324.04±13.88 303.61±5.99
10 3184.15±58.22 2119.82±41.38 2960.85±46.67 2534.05±38.65
30 4253.67±51.09 2474.71±27.45 4297.75±63.11 3525.29±49.75

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 354.30 318.97 347.65 312.04
10 3274.02 2169.56 3016.78 2585.35
30 4358.84 2504.59 4367.31 3626.41

1MB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 67.88±1.48 61.18±2.37 63.68±1.63 58.86±0.94
10 341.28±6.14 251.09±3.24 338.11±6.11 303.88±5.62
30 345.54±5.58 252.88±2.77 405.10±7.64 366.45±6.72

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 70.10 66.14 65.75 60.24
10 353.79 254.91 349.65 312.98
30 359.09 257.50 417.10 373.80

10MB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 15.62±0.15 11.38±0.15 14.59±0.35 13.20±0.36
10 34.98±0.59 24.56±0.41 41.66±1.01 38.01±0.57
30 33.34±0.40 23.66±0.22 43.98±0.57 38.99±0.66

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 15.85 11.56 15.05 13.74
10 35.89 25.26 42.91 39.17
30 33.90 24.00 44.67 39.81

Storage - Download - File (FileChannel)

BlobClient.downloadToFile(String filePath)

10KB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 383.84±6.76 347.76±5.99 364.60±4.94 338.24±8.00
10 3698.97±38.30 2610.57±31.59 3426.50±47.64 2975.50±31.50
30 5295.39±54.05 3194.34±29.67 5221.70±39.14 4190.38±50.41

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 394.33 357.39 374.31 351.73
10 3761.79 2650.05 3505.28 3010.62
30 5401.32 3245.29 5291.45 4279.19

1MB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 69.21±1.44 63.08±2.48 64.98±0.63 60.28±0.99
10 348.92±7.24 253.61±2.62 340.81±4.64 305.59±4.95
30 343.93±6.20 256.56±3.63 407.24±4.72 369.51±4.20

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 71.67 68.03 66.03 61.57
10 361.33 259.23 346.59 314.52
30 358.35 261.74 417.39 374.81

10MB

Avg throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 15.76±0.21 11.40±0.14 14.89±0.32 13.43±0.27
10 34.97±0.30 24.59±0.28 41.94±0.36 38.11±0.44
30 33.26±0.29 23.79±0.18 44.37±0.20 39.28±0.43

Max throughput (ops/s)

Concurrency okhttp
new
okhttp
old
netty
new
netty
old
1 16.21 11.66 15.39 13.98
10 35.60 24.99 42.47 38.76
30 33.75 24.08 44.72 39.90

Profiles

Procedure

In order to get profiles I created a simple app that repeats fixed number of transactions with different perf fat jar. Ran on VM with real account to have profiles that talk over HTTPS. This has been also done with storage scenarios.

package com.azure.syncstack;

import com.azure.core.perf.PipelineSendTest;
import com.azure.core.perf.core.CorePerfStressOptions;
import com.beust.jcommander.JCommander;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import java.io.IOException;

public class Main {

    public static void main(String[] args) {
        CorePerfStressOptions options = new CorePerfStressOptions();
        JCommander jc = new JCommander();
        jc.addCommand("pipelinesend", options);
        jc.parse(args);
        String parsedCommand = jc.getParsedCommand();
        if (parsedCommand == null || parsedCommand.isEmpty()) {
            throw new IllegalStateException("Not great");
        }

        System.out.println("=== Options ===");
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
            mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
            mapper.writeValue(System.out, options);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        PipelineSendTest pipelineSendTest = new PipelineSendTest(options);
        pipelineSendTest.globalSetupAsync().block();
        pipelineSendTest.setupAsync().block();

        int count = options.getCount();
        long startMillis = System.currentTimeMillis();
        System.out.println("Running " + count + " samples");
        for (int i = 0; i < count; i ++) {
            pipelineSendTest.run();
        }
        long endMillis = System.currentTimeMillis();
        System.out.println("Done in " + (endMillis - startMillis)/1000 + " sec");

        pipelineSendTest.cleanupAsync().block();
        pipelineSendTest.globalCleanupAsync().block();
    }
}
$binarydatatypes=@("bytes","file","stream")
$httpclients=@("netty","okhttp")
$syncasyncs=@("full-sync","sync-over-async")
$count=10000
$size=10485760

foreach ($binarydatatype in $binarydatatypes) {
    foreach ($httpclient in $httpclients) {
        foreach ($syncasync in $syncasyncs) {

java `
-XX:+FlightRecorder -XX:StartFlightRecording=settings=profile,filename=https-profile-$httpclient-$binarydatatype-$syncasync.jfr `
-cp "/home/azureuser/experiments/stress:azure-core-perf-$syncasync.jar" `
-Xms500m -Xmx500m com.azure.syncstack.Main pipelinesend `
--sync  --backend-type blobs `
--endpoint "REDACTED" `
--size $size `
--count $count `
--http-client $httpclient `
--binary-data-source $binarydatatype

        }
    }
}

Results

Pipeline - Upload - Bytes

Metric OkHttp
new
OkHttp
old
Netty
new
Netty
old
Allocation 112.01 GB 405.34 GB 2.72 GB 3.49 GB
GC Count 397 5,556 13 16
GC Total Time 0.41 s 9.7 s 0.04 s 0.04 s
Young GC Count 396 3,335 12 15
Old GC Count 1 2,221 1 1

Pipeline - Upload - File

Metric OkHttp
new
OkHttp
old
Netty
new
Netty
old
Allocation 14.5 GB 281.18 GB 4.72 GB 10.88 GB
GC Count 53 14,031 20 202
GC Total Time 0.08 s 54.66 s 0.05 s 2.97 s
Young GC Count 52 7,027 19 3
Old GC Count 1 7,004 1 199

Pipeline - Upload - Stream

Metric OkHttp
new
OkHttp
old
Netty
new
Netty
old
Allocation 14.5 GB 381.42 GB 4.86 GB 104.82 GB
GC Count 53 13,472 20 367
GC Total Time 0.08 s 45.61 s 0.06 s 0.32 s
Young GC Count 52 6,813 19 366
Old GC Count 1 6,659 1 1

Pipeline - Upload - Zero copy file transfer

This data comes from different setup where http protocol has been used to transfer file.

Metric Netty
new
Netty
old
Allocation 0.25 GB 7.14 GB
GC Count 4 90
GC Total Time 0.02 s 10.02 s
Young GC Count 3 81
Old GC Count 1 9

Storage - Download - OutputStream - 10,000 x 10MB

Metric OkHttp
new
OkHttp
old
Netty
new
Netty
old
Track1
Allocation 8.56 GB 116.42 GB 1.26 GB 102.87 GB 8.42 GB
GC Count 33 406 8 360 31
GC Total Time 0.07 s 0.39 s 0.03 s 0.33 s 0.06 s
Young GC Count 32 405 7 359 31
Old GC Count 1 1 1 1 0

Storage - Download - File - 10,000 x 10MB

Metric OkHttp
new
OkHttp
old
Netty
new
Netty
old
Allocation 11.97 153.09 5.67 106.31
GC Count 45 538 25 374
GC Total Time 0.19 s 1.5 s 0.15 s 0.98 s
Young GC Count 44 537 24 373
Old GC Count 1 1 1 1
#!/usr/bin/env python3
import sys
import re
import statistics
filename=sys.argv[1]
def get_key(line):
splitted = line.split(" ")
subset = splitted[1:len(splitted)-1]
return " ".join(subset)
compl_counter=0
results_by_key={}
with open(filename, "r") as file:
for fline in file:
line = fline.rstrip()
if line.startswith("Completed"):
if compl_counter == 0:
compl_counter += 1
else:
# print(line)
match = re.search(r"\(((\d+,)?\d+\.\d+)", line)
if match is not None:
tps = match.group(1)
tps = tps.replace(",","")
# print(tps)
if key not in results_by_key:
results_by_key[key] = []
results_by_key[key].append(float(tps))
elif line.startswith("Finished"):
key = get_key(line)
# print(key)
compl_counter=0
elif line.startswith("Starting"):
key = get_key(line)
if key not in results_by_key:
results_by_key[key] = []
# print(key)
compl_counter=0
#print(results_by_key)
means_by_key={}
stdev_by_key={}
max_by_key={}
sizes=set()
clients=set()
syncasyncs=set()
binarydatatypes=set()
parallels=set()
testname=""
for key in results_by_key:
results = results_by_key[key]
print(key + " (" + str(len(results)) + ")")
# print(results)
results.sort(reverse=True)
# print(results)
top10=results[0:10]
# print(top10)
if len(top10) > 0:
mean = "%.2f" % statistics.mean(top10)
else:
mean = "NaN"
if len(top10) > 1:
stdev = "%.2f" % statistics.stdev(top10)
else:
stdev = "NaN"
if len(top10) > 0:
max = "%.2f" % top10[0]
else:
max = "NaN"
# print("avg " + mean + " +/- " + stdev)
print("avg " + mean + "&pm;" + stdev)
print("max " + max)
means_by_key[key]=mean
stdev_by_key[key]=stdev
max_by_key[key]=max
segments=key.split(" ")
testname=segments[0]
clients.add(segments[1])
syncasyncs.add(segments[2])
parallels.add(segments[3])
sizes.add(segments[4])
print()
sizes=sorted(sizes,key=lambda e: int(e.split("=")[1]))
binarydatatypes=sorted(binarydatatypes)
parallels=sorted(parallels)
clients=sorted(clients, reverse=True)
syncasyncs=sorted(syncasyncs)
header="| Concurrency |"
subheader="|--|"
for client in clients:
for syncasync in syncasyncs:
header+=" " + client + " <br> "
if syncasync == "full-sync":
header+="new"
if syncasync == "new-main":
header+="new"
elif syncasync == "sync-over-async":
header+="old"
elif syncasync == "old-main":
header+="old"
else:
header+=syncasync
header+=" |"
subheader+="--|"
header+="\n"+subheader
print(testname)
for size in sizes:
print()
print("### " + size.split("=")[1])
print("#### Avg throughput (ops/s)")
print(header)
for parallel in parallels:
line="| " + parallel.split("=")[1] + " |"
for client in clients:
for syncasync in syncasyncs:
key=testname + " " + client + " " + syncasync + " " + parallel + " " + size
if key in means_by_key:
line+=" " + means_by_key[key] + "&pm;" + stdev_by_key[key] + " |"
else:
line+=" |"
print(line)
print("#### Max throughput (ops/s)")
print(header)
for parallel in parallels:
line="| " + parallel.split("=")[1] + " |"
for client in clients:
for syncasync in syncasyncs:
key=testname + " " + client + " " + syncasync + " " + parallel + " " + size
if key in max_by_key:
line+=" " + max_by_key[key] + " |"
else:
line+=" |"
print(line)
#!/usr/bin/env python3
import sys
import re
import statistics
filename=sys.argv[1]
def get_key(line):
splitted = line.split(" ")
subset = splitted[1:len(splitted)-1]
return " ".join(subset)
compl_counter=0
results_by_key={}
with open(filename, "r") as file:
for fline in file:
line = fline.rstrip()
if line.startswith("Completed"):
if compl_counter == 0:
compl_counter += 1
else:
# print(line)
match = re.search(r"\(((\d+,)?\d+\.\d+)", line)
if match is not None:
tps = match.group(1)
tps = tps.replace(",","")
# print(tps)
if key not in results_by_key:
results_by_key[key] = []
results_by_key[key].append(float(tps))
elif line.startswith("Finished"):
key = get_key(line)
# print(key)
compl_counter=0
elif line.startswith("Starting"):
key = get_key(line)
if key not in results_by_key:
results_by_key[key] = []
# print(key)
compl_counter=0
#print(results_by_key)
means_by_key={}
stdev_by_key={}
max_by_key={}
sizes=set()
clients=set()
syncasyncs=set()
binarydatatypes=set()
parallels=set()
testname=""
for key in results_by_key:
results = results_by_key[key]
print(key + " (" + str(len(results)) + ")")
# print(results)
results.sort(reverse=True)
# print(results)
top10=results[0:10]
# print(top10)
if len(top10) > 0:
mean = "%.2f" % statistics.mean(top10)
else:
mean = "NaN"
if len(top10) > 1:
stdev = "%.2f" % statistics.stdev(top10)
else:
stdev = "NaN"
if len(top10) > 0:
max = "%.2f" % top10[0]
else:
max = "NaN"
# print("avg " + mean + " +/- " + stdev)
print("avg " + mean + "&pm;" + stdev)
print("max " + max)
means_by_key[key]=mean
stdev_by_key[key]=stdev
max_by_key[key]=max
segments=key.split(" ")
testname=segments[0]
binarydatatypes.add(segments[1])
clients.add(segments[2])
syncasyncs.add(segments[3])
parallels.add(segments[4])
sizes.add(segments[5])
print()
sizes=sorted(sizes,key=lambda e: int(e.split("=")[1]))
binarydatatypes=sorted(binarydatatypes)
parallels=sorted(parallels)
clients=sorted(clients, reverse=True)
syncasyncs=sorted(syncasyncs)
header="| Concurrency |"
subheader="|--|"
for client in clients:
for syncasync in syncasyncs:
header+=" " + client + " <br> "
if syncasync == "full-sync":
header+="new"
elif syncasync == "sync-over-async":
header+="old"
else:
header+=syncasync
header+=" |"
subheader+="--|"
header+="\n"+subheader
for binarydatatype in binarydatatypes:
print()
print("## " + binarydatatype)
for size in sizes:
print()
print("### " + size.split("=")[1])
print("#### Avg")
print(header)
for parallel in parallels:
line="| " + parallel.split("=")[1] + " |"
for client in clients:
for syncasync in syncasyncs:
key=testname + " " + binarydatatype + " " + client + " " + syncasync + " " + parallel + " " + size
line+=" " + means_by_key[key] + "&pm;" + stdev_by_key[key] + " |"
print(line)
print("#### Max")
print(header)
for parallel in parallels:
line="| " + parallel.split("=")[1] + " |"
for client in clients:
for syncasync in syncasyncs:
key=testname + " " + binarydatatype + " " + client + " " + syncasync + " " + parallel + " " + size
line+=" " + max_by_key[key] + " |"
print(line)
@alzimmermsft
Copy link

alzimmermsft commented Apr 7, 2022

Notes from meeting:

Naming decision, converge on the usage of Sync or Synchronously everywhere.

Think about removing setContent and getContent in HttpRequest and use getBodyAsBinaryData and setBody(BinaryData) to prevent confusion on whether getBody or getContent should be used. Same concept applies to HttpResponse.

Potentially reduce the number of sendSynchronously overloads to only one, either HttpPipelineCallContext or (HttpRequest, Context). No matter what the HttpRequest only API should removed.

Determine the cost and impact making a breaking change to HttpPipelinePolicy on making processSynchronously a non-default interface API. Generally, SDK authors will be the only people implementing the interface and we can own updating all instances in our code base. For the few customers who we've worked with on creating their own policies we may be able to notify them about this breaking change and help them work through adding a new implementation.

For policies that are able to do so, we should look into having them use HttpPipelineSynchronousPolicy if it isn't too breaking and if the policy processes synchronously.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment