Skip to content

Instantly share code, notes, and snippets.

@forki
Last active May 16, 2022 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
// DEFINE PROCESSOR
type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) =
inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName)
override __.ClaimOwnershipAsync(partitions:System.Collections.Generic.IEnumerable<_>, token:CancellationToken) = task {
return partitions |> Seq.filter (fun p -> p.PartitionId = partitionID)
}
// USAGE
let processor =
new MyEventProcessorClient<Worker>(
blobContainer,
consumerGroup,
eventHubsConnectionKey.Force(),
"myevents",
_logger,
string partition)
processor.add_ProcessEventAsync(fun args ->
task {
let text = Encoding.UTF8.GetString(args.Data.EventBody.ToArray())
let message = Newtonsoft.Json.JsonConvert.DeserializeObject<MyEvent> text
_logger.LogInformation($"Partition: {partition} {args.Data.PartitionKey}")
try
do! EventHandler.run _logger message
with
| exn -> _logger.LogInformation($"Error: {exn.Message}")
do! args.UpdateCheckpointAsync(stoppingToken)
}
:> Task
)
processor.add_ProcessErrorAsync(fun args ->
task {
_logger.LogInformation($"ProcessError: {args.Exception.Message}")
} :> Task
)
@forki
Copy link
Author

forki commented May 16, 2022


type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) =
    inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName)

    let assignedPartitions = [| partitionID |]

    override __.ClaimOwnershipAsync(desiredOwnership:System.Collections.Generic.IEnumerable<_>,_cancellationToken:CancellationToken) = task {
        return
            desiredOwnership
            |> Seq.map (fun ownership ->
                ownership.LastModifiedTime <- DateTimeOffset.UtcNow
                ownership
            )
    }

    override __.ListPartitionIdsAsync(_connection:EventHubConnection,_cancellationToken:CancellationToken) =
        Task.FromResult assignedPartitions

    override this.ListOwnershipAsync(_cancellationToken:CancellationToken) = task {
        return
            assignedPartitions
            |> Seq.map (fun partition ->
                let ownership = EventProcessorPartitionOwnership()
                ownership.FullyQualifiedNamespace <- this.FullyQualifiedNamespace
                ownership.EventHubName <- this.EventHubName
                ownership.ConsumerGroup <- this.ConsumerGroup
                ownership.PartitionId <- partition
                ownership.OwnerIdentifier <- this.Identifier
                ownership.LastModifiedTime <- DateTimeOffset.UtcNow
                ownership
            )
    }

does this look better?

@jsquire
Copy link

jsquire commented May 16, 2022

That looks about right to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment