Skip to content

Instantly share code, notes, and snippets.

@isaacabraham
Last active August 15, 2020 09:36
Show Gist options
  • Save isaacabraham/4e6b6bf1ab0c14a26eb4 to your computer and use it in GitHub Desktop.
Save isaacabraham/4e6b6bf1ab0c14a26eb4 to your computer and use it in GitHub Desktop.
Demonstrates how to use F# mailbox processors in conjunction with Azure Storage Queues.
/// Code to bind mailbox processors to azure storage queues.
module AzureMailboxProcessor
open System
module private Async =
let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore
module private Option =
let fromNullable (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None
let toNullable = function
| Some value -> Nullable value
| None -> Nullable()
open Microsoft.WindowsAzure.Storage
open Microsoft.WindowsAzure.Storage.Queue
open Newtonsoft.Json
/// Gets a handle to an Azure storage queue
let private getQueue (connectionString, queueName) =
let connection = CloudStorageAccount.Parse connectionString
let queueClient = connection.CreateCloudQueueClient()
queueClient.GetQueueReference queueName
/// Creates a mailbox processor that writes to an Azure storage queue
let createQueueWriter<'a> (connectionString, queueName) =
let queue = getQueue(connectionString, queueName)
new MailboxProcessor<'a>(fun mb -> async {
while true do
let! message = mb.Receive()
let message = CloudQueueMessage(message |> JsonConvert.SerializeObject)
do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty })
// The different completion statuses a message can have.
type MessageProcessedStatus<'a> =
/// The message successfully completed.
| Completed
/// The message was not processed successfully and should be returned to the queue for processing again.
| Failed
/// Replace the original queue message with a new payload.
| Update of UpdatedPayload : 'a
/// Contains details on the queue subscription.
type QueueSubscriptionOptions =
{ /// How long to wait between polling requests.
PollTime : TimeSpan
/// The lease time for a queue message.
LeaseLength : TimeSpan option
/// How many times a message can be dequeued before being permanently removed.
MaxDequeueCount : int option }
let private completeMessage message (queue:CloudQueue) =
queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty
/// Represents an F# Agent that can be bound to an Azure storage queue.
type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>
/// Binds a MailboxProcessor to an Azure storage queue.
let bindToQueue<'a>((connectionString, queueName), options) (agent:AzureStorageQueueAgent<'a>) =
let queue = getQueue(connectionString, queueName)
async {
while true do
let! message = queue.GetMessageAsync(options.LeaseLength |> Option.toNullable, null, null) |> Async.AwaitTask
match message with
| null->
do! Async.Sleep(options.PollTime.TotalMilliseconds |> int)
| message ->
match message.DequeueCount, options.MaxDequeueCount with
| count, Some limit when count >= limit ->
// Message dequeue count exceeded, just delete the message
do! queue |> completeMessage message
| _ ->
try
let timeout = match options.LeaseLength with
| Some lifetime -> lifetime.TotalMilliseconds |> int
| None -> Threading.Timeout.Infinite
let status = agent.TryPostAndReply((fun ch -> JsonConvert.DeserializeObject<'a>(message.AsString), ch), timeout)
match status with
| None // there was no reply from the agent within specified time; lease has expired.
| Some Failed -> () // message has failed to process; do not complete the message.
| Some Completed -> do! queue |> completeMessage message
| Some (Update payload) ->
do! queue |> completeMessage message
do! queue.AddMessageAsync(CloudQueueMessage(payload |> JsonConvert.SerializeObject)) |> Async.AwaitTaskEmpty
with ex -> printfn "ARGH %s" ex.Message
} |> Async.Start
#I @"..\packages\"
#r @"Newtonsoft.Json.6.0.3\lib\net45\Newtonsoft.Json.dll"
#r @"WindowsAzure.Storage.4.1.0\lib\net40\Microsoft.WindowsAzure.Storage.dll"
#load "StorageQueueAgent.fs"
type Person = { Name : string; Age : int }
open AzureMailboxProcessor
open System
// Azure Storage Queue details
let queueDetails = ("UseDevelopmentStorage=true", "sample-queue")
// Reader
let subscriber = new AzureStorageQueueAgent<Person>(fun mb ->
let rec loop() =
async {
let! person, channel = mb.Receive()
// process the message and get the result
let status = match person with
| { Name = "Isaac"; Age = _ }
| { Name = "Richard"; Age = _ } -> Completed
| { Name = "Andy"; Age = _ } -> Update { person with Name = person.Name + "xyz" }
| _ -> Failed
// reply with the result - gets adapted by the StorageQueueAgent into the appropriate Azure Storage Queue behaviour
channel.Reply status
return! loop()
}
loop())
subscriber.Start()
/// Magic to bind the reader to the storage queue
subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; LeaseLength = Some <| TimeSpan.FromSeconds 5.; MaxDequeueCount = Some 3 })
/// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
let writer = queueDetails |> createQueueWriter<Person>
writer.Start()
writer.Post { Name = "Isaac"; Age = 34 }
writer.Post { Name = "Andy"; Age = 32 }
writer.Post { Name = "Richard"; Age = 39 }
writer.Post { Name = "Joe Bloggs"; Age = 35 }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment