Skip to content

Instantly share code, notes, and snippets.

@lmolkova
Last active February 13, 2024 02:29
Show Gist options
  • Save lmolkova/ee4635627c10f3c4e5ef3352c86dab86 to your computer and use it in GitHub Desktop.
Save lmolkova/ee4635627c10f3c4e5ef3352c86dab86 to your computer and use it in GitHub Desktop.
Load testing event fan-out scenario with Azure Event Hubs SDK for Java

Load testing event fan-out scenario with Azure Event Hubs SDK for Java

This gist contains performance measurements for event fan-out using Azure Event Hubs SDK for Java.

While all the numbers are specific to the environment, we can still use them to estimate performance numbers we can expect from applications that use Event Hubs and understand how different factors and configurations can affect them. Essentially, information in this gist can be used to explore optimization and performance tuning strategies.

Scenario

  1. Producer sends events to the Event Hubs namespace.
  2. Consumers receive events from the namespace
    • Upon each batch, the consumer fans out events in that batch to N partitions of another Event Hubs namespace
    • If all events are sent correctly, consumer updates checkpoint
    • If any of the partitions fail, processor does not checkpoint, fails and is being restarted (by the SDK)

image

We'll also add intentional processing failures at a very low rate to simulate processor losing ownership on partition and taking it over again.

We're using the throughput (number of events received and forwarded per second) as the main metric, but will also monitor other parameters.

Code

The full test code can be found here. The essential part is batch processing callback:

// re-group events in the batch to M batches to forward to M partitions in the different hub.
Map<String, List<EventData>> batches = regroupEvents(batchContext);

// send all batches in parallel
Flux.fromStream(batches.entrySet().stream().filter(e -> e.getValue() != null && !e.getValue().isEmpty()))
    .flatMap(entry -> {
        // simulate some failures
        if (ThreadLocalRandom.current().nextDouble() < processorFailureRatio) {
             return Mono.error(new SimulatedFailure());
        }

        // send each batch to the corresponding partition (send calls createBatch for each list of events internally)
        return producerClient.send(entry.getValue(), new SendOptions().setPartitionKey(entry.getKey()));
    })
    .parallel(batches.entrySet().size(), 1) // send to all partitions in parallel
    .runOn(Schedulers.boundedElastic()) // make sure you have enough threads in the pool to execute all of them (this pool is configured with reactor.schedulers.defaultBoundedElasticSize VM option)
    .then()
    .block();

// when all batches are sent successfully, update checkpoint.
batchContext.updateCheckpoint();

If any batch sending fails, we let error propagate back to the Event Hubs processor client - it will stop processing corresponding partition. Partition will be re-assigned after a timeout. Then processing will resume from the last checkpoint resulting in some duplication consumers should expect.

Configuration

Following parameters are configurable and we'll check how they affect performance

  • Event Hubs versions: 5.17.1 or 5.18.0
  • Number of CPU cores: 1 or 2
  • Available memory: 1GB or 2GB
  • Fan-out factor (how many partitions events are forwarded to): 1, 32, 64, 100
  • Event payload size: 16 bytes, 1 KB, 8 KB
  • Prefetch count: 1, 500, 1000, 2000 (per partition)
  • Processor batch size: 32, 64, 128, 256 events

Environment

We're using Azure SDK stress tests and infra to run tests and measure performance, we leverage OpenTelemetry to report metrics from Event Hubs SDK and tests.

  • Java 21
  • AKS cluster with resource limits on containers
    • Test application in the first container generates events.
    • Test application in the second container receives and forward events. It has 2 processor client instances working in parallel to emulate load-balancing.
    • We have only one instance of sender and one instance of forwarding applications.
  • Event Hubs namespace
    • Premium tier.
    • 2 throughput units.
    • we have both hubs (source one and one we forward to) in the same namespace.
    • the source hub has 32 partitions.
    • the forwarding hub has 32 partitions for most tests, 100 for higher fan-out factors tests.

Results

Event Hubs SDK versions

Throughput observed by version 5.18.0 is almost twice as high as version 5.17.1 can handle. This is a result of improved memory consumption that lowers memory pressure and reduces garbage collection frequency.

image

Parameters: cores: 1; memory: 1GB; fan-out factor: 32; processor batch size: 128; payload size: 16 bytes; prefetch count: 1

Scenario Received events per sec Forwarded events per sec CPU usage, % Memory usage, GB
5.17.1 6290 6092 98.4% 1
5.18.0 12079 11859 97.3% 1

If we look into time spent in GC, application running 5.17.1 spends around 70% of time in GC across all generations, while with version 5.18.0 is drops to ~10%.

We're going to use version 5.18.0 in all other tests.

Available CPU and memory

Here we see that the bare-bone forwarding scenario scales well vertically with CPU count. Increasing memory alone results in minor increase in the throughput. In the real-life scenarios, processing callback is more complicated and likely would need more memory/CPU.

image

Parameters: fan-out factor: 32; processor batch size: 128; payload size: 16 bytes; prefetch count: 1

Scenario Received events per sec Forwarded events per sec CPU usage, % Memory usage, GB
1 core, 1GB 12079 11859 97.3% 1
1 core, 2GB 12506 12262 96.7% 1.85
2 cores, 1GB 22685 22287 81.6% 1
2 cores, 2GB 25411 25043 84.9% 1.72

We're going to do the rest of the testing on 1 CPU, 1GB of memory.

Fan-out factor

Throughput slowly decreases when we increase fan-out factor.

image

Parameters: processor batch size: 128; payload size: 16 bytes; prefetch count: 1

Fan-out factor Received events per sec Forwarded events per sec CPU usage, % Memory usage, GB
1 16144 16119 58.1% 1
32 12079 11859 97,3% 1
64 9680 9448 95.5% 1
100 8887 8647 94.8% 1

The bigger fan-out factor is, the more batches we need to send. When sending batches in parallel, we need to make sure there are enough threads available for it. Assuming processor instance owns N source partitions and forwards to M other partitions, we'd need up to N x M threads and each thread would also need 1MB of memory.

Increasing fan-out factor, we also increase the latency of overall operation - it now matches the latency of the slowest operation out of M send calls plus any overhead parallelization introduces. Moreover, if a chance of transient error for sending a batch is 0.001%, then if we send 100 batches, we increase a chance of transient error for overall operation to 100 * 0.001 = 0.1%.

Note: the measurements above show low CPU usage for fan-out factor = 1 because the were not enough events in the source hub and the consumer was frequently idle waiting for more events, so we can be able to achieve even higher throughput in this edge case.

Processor batch size

In the forwarding scenario, processor batch size is one of the key parameters. Since we keep fan-out factor constant, we're creating bigger batches to forward. Parallel send calls are the most expensive operation that our forwarder does, but the performance degrades with batch size only slightly. As a result, throughput grows significantly along with the processor batch size.

image

Parameters: fan-out factor: 32; payload size: 1KB; prefetch count: 1

Processor batch size Received events per sec Forwarded events per sec CPU usage, % Memory usage, GB
32 5420 5376 95.7% 1
64 6605 6544 93.1% 1
128 9991 9924 95% 1
256 14297 14228 95.7% 1

Event payload size

The bigger the payload is, the lower throughput is and the more memory is needed.

image

Parameters: fan-out factor: 32; processor batch size: 128; prefetch count: 1

Event size Received events per sec Forwarded events per sec CPU usage, % Memory usage, GB
16 bytes 12079 11859 97.3% 1 (out of 1GB available)
1 KB 9991 9924 95 1 (out of 1GB available)
8 KB 5620 5476 99.8 2.5 GB (out of 3GB available)

Prefetch

Prefetch count does not affect throughput of the forwarding scenario in the conditions we have tested it. In our case Event Hubs resource and test application are co-located in the same Azure data center, resulting in a very fast and reliable network connection. We also have relatively complex processing callback. These factors combined make time and resources necessary to receive messages negligible.

It's always a great idea to experiment and find optimal prefetch count for your specific scenario and setup.

image

Parameters: fan-out factor: 32; processor batch size: 128; payload size: 16 bytes

Prefetch count per partition Received events per sec Forwarded events per sec CPU usage, % Memory usage, GB
1 12079 11859 97.3% 1
500 11869 11646 97.2% 1
1000 12016 11789 97.1% 1
2000 11659 11400 97.2% 1

Increasing amount of available memory has little-to-no-effect on the throughput in this scenario as well (rResults are omitted for brevity).

Note: prefetch count applies to each partition. When configuring it to relatively high values, make sure to configure enough memory. If processor owns P partitions, it would need to keep extra P * prefetchCount events in memory only on the receiving size. Amount of heap memory event uses can be estimated as 1KB + payload size. We recommend keeping prefetch-count low for Event Hubs versions 5.18.0 and lower to avoid excessive prefetching.

Summary

As we can see, performance of forwarding scenario depends on several important factors:

  • available resources and their utilization
  • event payload size and max batch size on the processor
  • fan-out factor

Make sure to use the latest version of Event Hubs SDK. Please refer to the next section for additional suggestions on things to check and monitor.

Monitoring and tuning fan-out scenario performance

We used the throughput - the number of successfully forwarded events - as a key indicator of performance in this scenario. We also kept an eye on CPU and memory to check that we use all available resources and observe the peak performance numbers with given configuration. In practice, there could be other important metrics not correlated with throughput such as end-to-end latency or duplication rate which we need to measure and take into account when tuning performance.

Such metrics could be correlated to throughput to some extent, but not fully. For example, we could achieve high throughput with very high processor batch size. However in presence of transient issues when forwarding a subset of events, the whole batch won't not be checkpointed - it will be re-processed after a delay resulting in some events forwarded twice.

Another problem of huge batch size is that processor waits for that number of events (or configurable amount of time whatever comes first) before dispatching the batch to the application. During periods of lower load, it could mean that processors spend time waiting for events to come increasing end-to-end latency.

It's important to identify and monitor key indicators for your system. In addition to these indicators, we need other observability signals to understand what went wrong when it (inevitably) happens. Let's list signals that are important to monitor for the forwarding scenario.

Batch processing duration and error rate

Processing error rate is one of the most important signals for Event Hubs consumers - if exception is not handled within the processing callback, it starts the following series of events:

  • Exception is reported on the processError callback.
  • Processor client stops processing this partition and closes underlying connection to the Event Hubs.
  • This or another processor instance takes ownership over this partition after partitionOwnershipExpirationInterval (defaults to 2 minutes).
  • Processor re-establishes the connection and starts processing events in that partition from the last recorded checkpoint.

So, if we can't forward a batch to the final destination:

  • we should not checkpoint source batch
  • we should (re)throw an exception and let processor client stop processing this partition - if we swallow the exception and let the processor continue, it will process the next batch. This would break the ordering, but most importantly, the next batch could succeed resulting in updating checkpoint. So the first faulty batch would never be processed again.

So once we stop processing this partition it will stay idle for configured partitionOwnershipExpirationInterval. It could makes sense to reduce this interval - refer to the Duration and error rate of checkpoint calls below for considerations.

It could be useful to record some telemetry in processor client processPartitionInitialization and processPartitionClose callbacks along with the partitionId to see how frequently it happens.

image

It's also important to monitor the latency of overall process operation that includes forwarding and checkpointing calls. Usually we need to understand the distribution of the latency and check P50 and P95 or other higher percentiles of the distribution. Check out this excellent talk by Gil Tene on understanding application responsiveness and avoiding common pitfalls when measuring latency.

Root-causing and improving the long tail or latency distribution usually brings more benefit in distributed applications than local performance optimizations affecting median latency only.

To investigate the sources of high latency we'd need to see durations of underlying operation and their distribution. Event Hubs is instrumented with OpenTelemetry and supports distributed tracing and emits metrics in experimental mode.

image

Duration and error rate of re-sending calls

Forwarding re-grouped events to final destination is probably the biggest contributor to processing latency. As we've seen the fan-out factor is the key factor here. Also, since we send forwarded batches in parallel, we're adding another source of potential issues related to threading and synchronization.

If individual send calls are fast and reliable, but overall operation is slow, make sure to check and benchmark parallelization code.

If some of the individual send calls are very long and result in timeouts, these may be caused by transitive network issues. Timed out calls are retried by the Event Hubs sender client and in this case timeout value and back-off intervals have the highest impact on the overall latency.

If your connection to the Event Hubs is usually fast and reliable (for example if you host application in Azure and in the same region as Event Hubs resource), you can try reducing the try timeout to a several seconds

new EventHubClientBuilder()
    .retryOptions(new AmqpRetryOptions().setTryTimeout(Duration.ofSeconds(5)))
    .eventHubName(options.getEventHubsEventHubName())
    ...

The default try timeout is set to 1 minute, but with reliable connection it usually does not make sense to wait for more than a few seconds until the service responds. If it does not respond soon, it's likely to never respond and waiting for longer is not helpful.

Issues like timeouts may be also caused by high CPU, overloaded IO thread, or threading issues such as synchronization or starvation. If you see high rate of timeout issues, make sure to check CPU and memory on the corresponding consumer instances at that time.

In case of low CPU utilization combined with low throughput (when you expect system to do more), check:

  • do you have adequate thread pool sizes? When parallelizing event forwarding, we need up to fanOutFactor threads per owned partition. Make sure to configure thread pools for it. Here's the project reactor thread pool documentation describing configuration options and defaults.
  • profile the application and check if some threads are much more busy than others. Event Hubs handles IO operations for a specific connection on special threads following reactor-executor-* naming pattern. If these threads are very busy, consider creating multiple Event Hubs producer or processor clients to process and/or publish events and spread work between them. Or, if you share connection between producer clients, stop doing it.
image

Duration and error rate of checkpoint calls

Checkpointing involves a network call that might be another source of the latency issues and errors. Consider reducing timeout as an optimization - please refer to Azure Storage Blob samples on how to configure HTTP client timeouts. Refer to the Event Hubs documentation for general guidance on using Azure Storage Blobs as a checkpoint store.

Note that the same storage account is used for load balancing to claim and preserve ownership on partitions. If checkpoint latency or error rate is high, it most likely means that processors can't preserve their ownership and lose it to other processor instances, causing higher event duplication rate.

You can control how frequently ownership is renewed with loadBalancingUpdateInterval setter on processor client builder. Make sure to keep load balancing update interval to be much smaller than partitionOwnershipExpirationInterval.

Resource utilization

It's common to keep resource limits low in containerized environments and scale then up dynamically. So high resource utilization could be healthy (unless it's unusually high).

Memory

One of the important things to check when running high-scale Java applications is memory consumption and number of application/container restarts. If application dies with Java out-of-memory errors or container runtime kills it (because it requests more memory than allowed to) it's a good signal that either there is a memory leak or application just needs more memory to function. In both cases it's a good idea to get a memory dump (make sure to enable one, for example with -XX:HeapDumpOnOutOfMemoryError VM option) and investigate what's taking up all the memory. From Event Hubs perspective, you might need to decrease processor batch size and prefetch count to reduce the number of events kept in the memory.

Check how much time is spent in garbage collection - if there is not enough memory, application can waste a big chunk of the CPU time on GC. Optimizing memory usage or increasing amount of available memory can significantly improve performance. If you see that memory consumption grows over time despite garbage collection attempts to clean up heap space, it's likely a signal of a memory leak. Taking several heap dumps and comparing them between each other should show which objects retained heap space over time and what's holding them in the memory.

CPU

Relatively high CPU in messaging scenarios is usually a good signal that workers are utilized well. Since Event Hubs processor client comes with backpressure, we don't need to protect them from unexpected bursts of load. Still, it's a good idea to understand what's causing high CPU usage. Profiling your application during development or in production with continuous profiler should show any expensive operations on the hot path and is a great input for local optimizations. Make sure to also check time spent in garbage collection.

Low CPU utilization combined with low throughput (when you expect system to do more) is a signal of synchronization, locking, or a similar issue. From the Event Hubs side, check

  • how many partitions are owned by the worker instance? Are there enough events to process?
  • how big processor batches typically are comparing to configured maximum batch size. If they are smaller than the batch size, reduce max wait time to wait less for the full batch to come
  • check if other recommendations in Duration and error rate of re-sending calls apply.

Rate of network issues

Some level of network issues is expected in high-scale distributed application and Azure SDKs should handle and retry them. In some cases network issues last for prolonged periods of time and SDK exhausts all retries. Processor client however does not stop trying to receive events until it's stopped or disposed. If connection terminates or a response indicating an error is received, it re-establishes the connection. The processor client retries all issues, including authorization issues or missing entity errors, so that if entity was deleted by mistake, it can be restored on the Event Hubs service side without the need to restart the application.

You can use metrics emitted by the SDK to check number of transport-related errors - check out experimental SDK metrics to monitor transport-level issues.

Check out Event Hubs troubleshooting guide for more details on how to investigate connectivity issues.

Event Hubs service-side metrics

As load in your application grows, make sure to check if Event Hubs is scaled adequately. Refer to the Monitor Azure Event Hubs and Azure Event Hubs quotas and limits articles for the details.

Scenario Received events per sec Forwarded events per sec Average CPU usage, % Memory usage, GB Cores available Memory available, GB Received Forwarded Duration, sec
cpu, memory
1 core, 1GB 12079 11859 97.3 1 1 1 43562049 42770482 3606.508
1 core, 2GB 12506 12262 97% 1.85 1 2 45023312 44145423 3600.222
2 cores, 1GB 22685 22287 82% 1 2 1 81793202 80356658 3605.602
2 cores, 2GB 25411 25043 85% 1.72 2 2 91620739 90295696 3605.624
version
5.18.0 12079 11859 97.3 1 1 1 43562049 42770482 3606.508
5.17.1 6290 6092 98.4 1 1 1 22685311 21970568 3606.409
prefetch
1 12079 11859 97.3 1 1 1 43562049 42770482 3606.508
500 11869 11646 97.2 1 1 1 42806163 42002095 3606.699
1000 12016 11789 97.1 1 1 1 43342193 42524465 3606.994
2000 11659 11400 97.2 1 1 1 42050706 41118254 3606.853
1000 11807 11650 96 1.86 1 2 42573149 42005741 3605.608
2000 12269 12022 96.4 1.86 1 2 44251113 43358219 3606.712
2000 21769 20528 84.6 1.9 2 2 19719872 18596176 905.882
fan out
1 16144 16119 58.1 1 1 1 58209562 58121856 3605.711
32 12079 11859 97.3 1 1 1 43562049 42770482 3606.508
64 9680 9448 95.5 1 1 1 34907263 34074047 3606.295
100 8887 8647 94.8 1 1 1 32050429 31185632 3606.422
message size
16 bytes 12079 11859 97.3 1 1 1 43562049 42770482 3606.508
1KB 9991 9924 95 1 1 1 36026181 35784968 3606.022
8KB (needs 3GB of memory) 5620 5476 99.8 2.5 1 3 10158160 9897744 1807.579
batch size
32 5420 5376 95.7 1 1 1 19543598 19383927 3605.725
64 6605 6544 93.1 1 1 1 23817309 23597654 3606.172
128 9991 9924 95 1 1 1 36026181 35784968 3606.022
256 14297 14228 95.7 1 1 1 51555123 51309278 3606.097
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment