Skip to content

Instantly share code, notes, and snippets.

@isaacabraham
Last active January 15, 2016 23:40
Show Gist options
  • Save isaacabraham/0e2ac661a5f31daf71a9 to your computer and use it in GitHub Desktop.
Save isaacabraham/0e2ac661a5f31daf71a9 to your computer and use it in GitHub Desktop.
Mailbox Processor in Cloud Agent with automatic dead lettering and at-least-once processing.
let CreateActor (ActorKey name) =
MailboxProcessor.Start(fun mailbox ->
let messageStore = GetMessageStore name
let rec loop data =
async {
// Wait asynchronously until we get a message + reply channel
let! message, replyWith = mailbox.Receive()
match message with
| Clear ->
messageStore.DeleteIfExists()
replyWith Completed // confirm processing!
return! loop { Count = 0; Messages = [] }
| Record message when data.Count < 5 ->
let updatedData =
{ data with
Count = data.Count + 1
Messages = data.Messages @ [ message ] }
messageStore.SetData(updatedData)
replyWith Completed // confirm processing!
return! (loop updatedData)
| Record message ->
// bad data - max limit of messages is 5
replyWith Abandoned
}
let data = defaultArg (messageStore.GetData()) { Count = 0; Messages = [] }
loop data)
// Change how we start listening to the pool as well - ResilientCloudAgent instead of BasicCloudAgent
let disposable = ConnectionFactory.StartListening(cloudConnection, CreateActor >> ResilientCloudAgent)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment