Created
April 7, 2020 07:14
-
-
Save isaacabraham/87f12aa0044223b0623ab108f699213b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* QUEUES | |
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.queue-1 | |
*) | |
// 1.0 Basic queue - hand-rolled | |
let q = System.Collections.Generic.Queue<string>() | |
q.Enqueue "Hello" | |
q.Enqueue "Hello1" | |
q.Enqueue "Hello2" | |
q.Enqueue "Hello3" | |
q.Enqueue "Hello4" | |
// 1.1. Creating a "listener" to the queue by polling | |
async { | |
while true do | |
printfn "Checking queue..." | |
if q.Count = 0 then | |
printfn "Nothing on the queue!" | |
do! Async.Sleep 5000 | |
else | |
printfn "Got some work!" | |
let message = q.Dequeue() | |
printfn "%s" message | |
} |> Async.Start | |
(* MAILBOX PROCESSOR | |
https://fsharpforfunandprofit.com/posts/concurrency-actor-model/ | |
*) | |
(* 2.1 - Simple agent *) | |
// This agent can accept a list of work and will guarantee processing them sequentially. | |
let simpleAgent = | |
MailboxProcessor.Start(fun mailbox -> | |
// Loop indefinitely | |
async { | |
while true do | |
let! message = mailbox.Receive() | |
//Uncomment this line to simulate a "long running" process | |
//do! Async.Sleep 1000 | |
printfn "RECEIVED A NUMBER: %d" message | |
} | |
) | |
// Send a single message | |
simpleAgent.Post 0 | |
// Send 20 messages to the agent | |
for i in 1 .. 20 do | |
simpleAgent.Post i | |
(* 2.2 - Replying agent *) | |
// This agent can accept a list of work and will guarantee processing them sequentially. It will | |
// immediately reply with the answer. | |
let replyingAgent = | |
MailboxProcessor.Start(fun mailbox -> | |
async { | |
while true do | |
let! reply, message = mailbox.Receive() | |
printfn "DOUBLING A NUMBER: %d" message | |
reply (message * 2) | |
} | |
) | |
// Send a single message | |
let answer = replyingAgent.PostAndReply(fun channel -> channel.Reply, 10) | |
(* 2.3 - Command agent *) | |
// This agent accepts a number of different commands via a DU. This is a common pattern for working | |
// with MBPs. | |
type Command = Add of int * int | Multiply of int * int | Divide of int * int | Subtract of int * int | |
let multipleMessages = | |
MailboxProcessor.Start(fun mailbox -> | |
async { | |
while true do | |
let! reply, message = mailbox.Receive() | |
match message with | |
| Add (a,b) -> reply (a + b) | |
| Multiply (a,b) -> reply (a * b) | |
| Divide (a,b) -> reply (a / b) | |
| Subtract (a,b) -> reply (a - b) | |
} | |
) | |
(* 2.4 State machine *) | |
// This agent tracks its state based on commands. We can safely use mutable state here because | |
// only the agent itself can access it. | |
type State = On | Off | |
type StateCommand = TurnOn | TurnOff | DoWork | |
let stateMachineAgent = | |
MailboxProcessor.Start(fun mailbox -> | |
async { | |
let mutable state = Off | |
while true do | |
let! command = mailbox.Receive() | |
match command, state with | |
| DoWork, On -> printfn "Doing some work!" | |
| DoWork, Off -> printfn "Ignored, I'm not active!" | |
| TurnOn, On -> printfn "Already on!" | |
| TurnOn, Off -> state <- On | |
| TurnOff, On -> state <- Off | |
| TurnOff, Off -> printfn "Already off!" | |
} | |
) | |
stateMachineAgent.Post DoWork // oops! | |
stateMachineAgent.Post TurnOn | |
stateMachineAgent.Post DoWork // works | |
stateMachineAgent.Post TurnOff | |
stateMachineAgent.Post TurnOff | |
// etc. | |
(* 2.5 Progress Reporting *) | |
// This agent accepts only one piece of work at a time, and will reject more requests whilst one | |
// is in progress. It will allow you to get the current status whilst it is working. | |
type ProgressState = Idle | Working | |
type ProgressCommand = DoWork | ReportComplete | GetStatus of reply:(ProgressState -> unit) | |
let progressAgent = | |
MailboxProcessor.Start(fun mailbox -> | |
async { | |
let mutable state = Idle | |
while true do | |
let! command = mailbox.Receive() | |
match command, state with | |
| DoWork, Idle -> | |
state <- Working | |
// Spawn a "child" async workflow to do the work and let the mailbox know when it's complete | |
async { | |
printfn "Doing some work..." | |
do! Async.Sleep 10000 | |
printfn "Done!" | |
mailbox.Post ReportComplete | |
} |> Async.Start | |
| DoWork, Working -> | |
printfn "Can't work, busy. Try again later!" | |
| ReportComplete, _ -> | |
state <- Idle | |
| GetStatus reply, _ -> | |
reply state | |
} | |
) | |
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply) | |
progressAgent.Post DoWork | |
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply) | |
progressAgent.Post DoWork | |
// Wait 10 seconds! | |
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply) | |
progressAgent.Post DoWork | |
progressAgent.PostAndReply(fun channel -> GetStatus channel.Reply) | |
(* 2.5 Progress Reporting via Callback *) | |
// As above, except the agent does not provide the current status via a message. Instead, it provides | |
// status updates via a callback who must deal with the message. It can queue up work as well, unlike the above. | |
type CallbackState = Idle | Working | |
type CallbackCommand = DoWork | |
let callbackAgent onStatusChange = | |
MailboxProcessor.Start(fun mailbox -> | |
async { | |
let mutable state = Idle | |
let updateState newState = | |
state <- newState | |
onStatusChange newState | |
while true do | |
let! command = mailbox.Receive() | |
match command, state with | |
| DoWork, Idle -> | |
updateState Working | |
printfn "Doing some work..." | |
do! Async.Sleep 2000 | |
printfn "Done!" | |
updateState Idle | |
| DoWork, Working -> | |
printfn "Can't work, busy. Try again later!" | |
} | |
) | |
// Careful here! This works for a script but is not appropriate for a production application! | |
let mutable currentState = Idle | |
let agent = callbackAgent (fun state -> currentState <- state) | |
agent.Post DoWork | |
agent.Post DoWork | |
agent.Post DoWork | |
agent.CurrentQueueLength | |
currentState |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment