Skip to content

Instantly share code, notes, and snippets.

@forki
Last active May 16, 2022 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
Save forki/e6ab43b4e7ec9321bb19ef1df6f92086 to your computer and use it in GitHub Desktop.
// DEFINE PROCESSOR
type MyEventProcessorClient<'a>(blobContainer:BlobContainerClient,consumerGroup:string,connectionString:string,eventHubName:string,logger : ILogger<'a>,partitionID:string) =
inherit EventProcessorClient(blobContainer,consumerGroup,connectionString,eventHubName)
override __.ClaimOwnershipAsync(partitions:System.Collections.Generic.IEnumerable<_>, token:CancellationToken) = task {
return partitions |> Seq.filter (fun p -> p.PartitionId = partitionID)
}
// USAGE
let processor =
new MyEventProcessorClient<Worker>(
blobContainer,
consumerGroup,
eventHubsConnectionKey.Force(),
"myevents",
_logger,
string partition)
processor.add_ProcessEventAsync(fun args ->
task {
let text = Encoding.UTF8.GetString(args.Data.EventBody.ToArray())
let message = Newtonsoft.Json.JsonConvert.DeserializeObject<MyEvent> text
_logger.LogInformation($"Partition: {partition} {args.Data.PartitionKey}")
try
do! EventHandler.run _logger message
with
| exn -> _logger.LogInformation($"Error: {exn.Message}")
do! args.UpdateCheckpointAsync(stoppingToken)
}
:> Task
)
processor.add_ProcessErrorAsync(fun args ->
task {
_logger.LogInformation($"ProcessError: {args.Exception.Message}")
} :> Task
)
@jsquire
Copy link

jsquire commented May 16, 2022

That looks about right to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment