Skip to content

Instantly share code, notes, and snippets.

@dburriss
Forked from isaacabraham/MailboxSamples
Last active August 15, 2020 09:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dburriss/51f750cf797140a2ac48576bc53e6117 to your computer and use it in GitHub Desktop.
Save dburriss/51f750cf797140a2ac48576bc53e6117 to your computer and use it in GitHub Desktop.
(* QUEUES
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.queue-1
*)
// 1.0 Basic queue - hand-rolled
let q = System.Collections.Generic.Queue<string>()
q.Enqueue "Hello"
q.Enqueue "Hello1"
q.Enqueue "Hello2"
q.Enqueue "Hello3"
q.Enqueue "Hello4"
// 1.1. Creating a "listener" to the queue by polling
async {
while true do
printfn "Checking queue..."
if q.Count = 0 then
printfn "Nothing on the queue!"
do! Async.Sleep 5000
else
printfn "Got some work!"
let message = q.Dequeue()
printfn "%s" message
} |> Async.Start
(* MAILBOX PROCESSOR
https://fsharpforfunandprofit.com/posts/concurrency-actor-model/
*)
(* 2.1 - Simple agent *)
// This agent can accept a list of work and will guarantee processing them sequentially.
let simpleAgent =
MailboxProcessor.Start(fun mailbox ->
// Loop indefinitely
async {
while true do
let! message = mailbox.Receive()
//Uncomment this line to simulate a "long running" process
//do! Async.Sleep 1000
printfn "RECEIVED A NUMBER: %d" message
}
)
// Send a single message
simpleAgent.Post 0
// Send 20 messages to the agent
for i in 1 .. 20 do
simpleAgent.Post i
(* 2.2 - Replying agent *)
// This agent can accept a list of work and will guarantee processing them sequentially. It will
// immediately reply with the answer.
let replyingAgent =
MailboxProcessor.Start(fun mailbox ->
async {
while true do
let! reply, message = mailbox.Receive()
printfn "DOUBLING A NUMBER: %d" message
reply (message * 2)
}
)
// Send a single message
let answer = replyingAgent.PostAndReply(fun channel -> channel.Reply, 10)
(* 2.3 - Command agent *)
// This agent accepts a number of different commands via a DU. This is a common pattern for working
// with MBPs.
type Command = Add of int * int | Multiply of int * int | Divide of int * int | Subtract of int * int
let multipleMessages =
MailboxProcessor.Start(fun mailbox ->
async {
while true do
let! reply, message = mailbox.Receive()
match message with
| Add (a,b) -> reply (a + b)
| Multiply (a,b) -> reply (a * b)
| Divide (a,b) -> reply (a / b)
| Subtract (a,b) -> reply (a - b)
}
)
(* 2.4 State machine *)
// This agent tracks its state based on commands. We can safely use mutable state here because
// only the agent itself can access it.
type State = On | Off
type StateCommand = TurnOn | TurnOff | DoWork
let stateMachineAgent =
MailboxProcessor.Start(fun mailbox ->
async {
let mutable state = Off
while true do
let! command = mailbox.Receive()
match command, state with
| DoWork, On -> printfn "Doing some work!"
| DoWork, Off -> printfn "Ignored, I'm not active!"
| TurnOn, On -> printfn "Already on!"
| TurnOn, Off -> state <- On
| TurnOff, On -> state <- Off
| TurnOff, Off -> printfn "Already off!"
}
)
stateMachineAgent.Post DoWork // oops!
stateMachineAgent.Post TurnOn
stateMachineAgent.Post DoWork // works
stateMachineAgent.Post TurnOff
stateMachineAgent.Post TurnOff
// etc.
(* 2.5 Progress Reporting *)
// This agent accepts only one piece of work at a time, and will reject more requests whilst one
// is in progress. It will allow you to get the current status whilst it is working.
type ProgressState = Idle | Working
type ProgressCommand = DoWork | ReportComplete | GetStatus of reply:(ProgressState -> unit)
let progressAgent =
MailboxProcessor.Start(fun mailbox ->
async {
let mutable state = Idle
while true do
let! command = mailbox.Receive()
match command, state with
| DoWork, Idle ->
state <- Working
// Spawn a "child" async workflow to do the work and let the mailbox know when it's complete
async {
printfn "Doing some work..."
do! Async.Sleep 10000
printfn "Done!"
mailbox.Post ReportComplete
} |> Async.Start
| DoWork, Working ->
printfn "Can't work, busy. Try again later!"
| ReportComplete, _ ->
state <- Idle
| GetStatus reply, _ ->
reply state
}
)
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply)
progressAgent.Post DoWork
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply)
progressAgent.Post DoWork
// Wait 10 seconds!
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply)
progressAgent.Post DoWork
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply)
(* 2.5 Progress Reporting via Callback *)
// As above, except the agent does not provide the current status via a message. Instead, it provides
// status updates via a callback who must deal with the message. It can queue up work as well, unlike the above.
type CallbackState = Idle | Working
type CallbackCommand = DoWork
let callbackAgent onStatusChange =
MailboxProcessor.Start(fun mailbox ->
async {
let mutable state = Idle
let updateState newState =
state <- newState
onStatusChange newState
while true do
let! command = mailbox.Receive()
match command, state with
| DoWork, Idle ->
updateState Working
printfn "Doing some work..."
do! Async.Sleep 2000
printfn "Done!"
updateState Idle
| DoWork, Working ->
printfn "Can't work, busy. Try again later!"
}
)
// Careful here! This works for a script but is not appropriate for a production application!
let mutable currentState = Idle
let agent = callbackAgent (fun state -> currentState <- state)
agent.Post DoWork
agent.Post DoWork
agent.Post DoWork
agent.CurrentQueueLength
currentState
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment