Skip to content

Instantly share code, notes, and snippets.

@forki
Last active May 16, 2022 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
// DEFINE PROCESSOR
type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) =
inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName)
override __.ClaimOwnershipAsync(partitions:System.Collections.Generic.IEnumerable<_>, token:CancellationToken) = task {
return partitions |> Seq.filter (fun p -> p.PartitionId = partitionID)
}
// USAGE
let processor =
new MyEventProcessorClient<Worker>(
blobContainer,
consumerGroup,
eventHubsConnectionKey.Force(),
"myevents",
_logger,
string partition)
processor.add_ProcessEventAsync(fun args ->
task {
let text = Encoding.UTF8.GetString(args.Data.EventBody.ToArray())
let message = Newtonsoft.Json.JsonConvert.DeserializeObject<MyEvent> text
_logger.LogInformation($"Partition: {partition} {args.Data.PartitionKey}")
try
do! EventHandler.run _logger message
with
| exn -> _logger.LogInformation($"Error: {exn.Message}")
do! args.UpdateCheckpointAsync(stoppingToken)
}
:> Task
)
processor.add_ProcessErrorAsync(fun args ->
task {
_logger.LogInformation($"ProcessError: {args.Exception.Message}")
} :> Task
)
@jsquire
Copy link

jsquire commented May 16, 2022

Based on the code, I think that I understand what the root of the additional costing is. My F# is only intermediate and a bit rusty, so forgive me if I'm misreading parts.

Because you're returning a static set of data from ClaimOwnershipAsync and not overriding ListOwnershipAsync or ListPartitionIdsAsync, each time the load balancing loop ticks it will make a storage request to list all of the ownership blobs in the container. Since you're not writing out any ownership data, when the processor inspects the ownership list and compares it to the set of available partitions, it does not believe that it has its fair share of work. Unless another processor is running without this customization, the processors will each see themselves as the only active processor for the cluster and each will believe that it should own all partitions. Because of this, the processor is going to continue to try and claim a partition during each load balancing cycle and will never see ownership as balanced.

In v5.6.2, the processor defaulted to the Balanced strategy with more aggressive defaults for load balancing intervals. This means that your load balancing loop was ticking every 10 seconds and making the aforementioned Storage calls. In the v5.7.x line, we made some adjustments based on the data that we've gathered for production issues over the last two years. Part of those tweaks involve setting the strategy to Greedy by default to offset slower values for loop iteration. For the normal case, this reduces Storage calls by 2/3 - once the processor has reached a state where ownership is balanced. Therein lies the problem.

What the Greedy strategy does is to ignore the load balancing interval when the processor believes that it has not reached its ownership quota. Because your processor will never see ownership as balanced, load balancing will continue to run without pause, each time making a Storage request.

The good news is that basic mitigation is simple - setting the LoadBalancingStrategy to Balanced in the EventProcessorClientOptions will stop the constant loop iterations. Because we increased the load balancing interval, your loop will run every 30 seconds, rather than every 10.

That said, the better approach would be to override ListPartitionIdsAsync and ListOwnershipAsync as well, telling the processor that its assigned partition is the only one and that it already owns it. This will allow you to bypass Storage entirely. The details of doing so are demonstrated in this sample.

@jsquire
Copy link

jsquire commented May 16, 2022

It appears that you're checkpointing after every event. This is valid, but will cause a high number of Storage requests, driving up costs and slowing down throughput. Generally, most applications tend to use a strategy where they checkpoint every XX number of events or YY time elapsed. (for example, every 100 events or 5 minutes)

The thresholds would be specific to your application and depend on how expensive it is to reprocess events should there be a failure that requires rewinding to the last checkpoint recorded.

@forki
Copy link
Author

forki commented May 16, 2022


type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) =
    inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName)

    let assignedPartitions = [| partitionID |]

    override __.ClaimOwnershipAsync(desiredOwnership:System.Collections.Generic.IEnumerable<_>,_cancellationToken:CancellationToken) = task {
        return
            desiredOwnership
            |> Seq.map (fun ownership ->
                ownership.LastModifiedTime <- DateTimeOffset.UtcNow
                ownership
            )
    }

    override __.ListPartitionIdsAsync(_connection:EventHubConnection,_cancellationToken:CancellationToken) =
        Task.FromResult assignedPartitions

    override this.ListOwnershipAsync(_cancellationToken:CancellationToken) = task {
        return
            assignedPartitions
            |> Seq.map (fun partition ->
                let ownership = EventProcessorPartitionOwnership()
                ownership.FullyQualifiedNamespace <- this.FullyQualifiedNamespace
                ownership.EventHubName <- this.EventHubName
                ownership.ConsumerGroup <- this.ConsumerGroup
                ownership.PartitionId <- partition
                ownership.OwnerIdentifier <- this.Identifier
                ownership.LastModifiedTime <- DateTimeOffset.UtcNow
                ownership
            )
    }

does this look better?

@jsquire
Copy link

jsquire commented May 16, 2022

That looks about right to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment