Skip to content

Instantly share code, notes, and snippets.

@TWith2Sugars
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TWith2Sugars/2d6a8c99d2700adcc1a8 to your computer and use it in GitHub Desktop.
Save TWith2Sugars/2d6a8c99d2700adcc1a8 to your computer and use it in GitHub Desktop.
EventHubProcessor in F#
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging
open Microsoft.WindowsAzure.Storage
open System
open System.Diagnostics
open System.Text
open System.Threading.Tasks
open System.Threading
type Async with
/// Returns an asynchronus computation that will wait for the given task to complete
static member AwaitIgnoreTask(task : Task) : Async<unit> =
task
|> Async.AwaitIAsyncResult
|> Async.Ignore
let eventProcessorHostName = Guid.NewGuid().ToString()
[<Literal>]
let EventConsumerGroup = "SOMECONSUMERGROUP"
type Receiver() =
let stopWatch = Stopwatch()
interface IEventProcessor with
member __.CloseAsync(context : PartitionContext, reason : CloseReason) =
async {
printfn "%s Shuting Down. Partition '%s', Reason: '%O'." eventProcessorHostName
context.Lease.PartitionId reason
if reason = CloseReason.Shutdown then do! context.CheckpointAsync() |> Async.AwaitIgnoreTask
}
|> Async.StartAsTask :> Task
member __.OpenAsync(context : PartitionContext) =
async {
printfn "%s initialize. Partition: '%s', Offset: '%s'" eventProcessorHostName context.Lease.PartitionId
context.Lease.Offset
stopWatch.Start()
}
|> Async.StartAsTask :> Task
member __.ProcessEventsAsync(context : PartitionContext, messages : EventData seq) =
async {
do! messages
|> Seq.map doSomethingWithMessage
|> Async.Parallel
|> Async.Ignore
if stopWatch.Elapsed > TimeSpan.FromMinutes(1.0) then
let last = Seq.last messages
printfn "processed %i messages, last message receive time: %s" (Seq.length messages)
(last.EnqueuedTimeUtc.ToString("o"))
do! context.CheckpointAsync(last) |> Async.AwaitIgnoreTask
stopWatch.Reset()
}
|> Async.StartAsTask :> Task
[<EntryPoint>]
let main __ =
let namespaceManager =
NamespaceManager.CreateFromConnectionString("connectionstring")
let ehd = namespaceManager.GetEventHub("eventhub")
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, EventConsumerGroup) |> ignore
let eventProcessorHost =
EventProcessorHost
(eventProcessorHostName, "eventhub", EventConsumerGroup,
"connectionstring", "storageconnectionstring")
let reciever = eventProcessorHost.RegisterEventProcessorAsync<Receiver>()
reciever.Wait()
while true do
Thread.Sleep(1000)
0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment