Last active
July 15, 2020 13:54
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A reply to: https://twitter.com/davidfowl/status/1282445306261471232 | |
// This implements a connection of some kind that can be started and stopped asynchronously. | |
// We must ensure that we don't try to start it while it is stopping or vice-versa. | |
// A few internal types first... | |
/// Contains whatever state needs to be held on to while connected. | |
type private ConnectedState = { Id: int } | |
/// The state of the internal agent. Describes the current status of the connection, | |
/// and the callbacks that have been registered for when the status changes. | |
type private ConnectionStatus = | |
| NotRunning | |
| Starting of onStarted: list<bool -> unit> | |
| Running of connectedState: ConnectedState | |
| Stopping of onStopped: list<bool -> unit> | |
/// Commands and state transitions for the internal agent. | |
/// AsyncReplyChannel<T> is an object to which we Reply() with a value of type T once the command is done. | |
type private ConnectionCommand = | |
| Start of chan: AsyncReplyChannel<bool> | |
| Started of connectedState: ConnectedState | |
| Stop of chan: AsyncReplyChannel<bool> | |
| Stopped | |
| Dispose of chan: AsyncReplyChannel<unit> | |
// And then the main Connection. | |
type Connection() = | |
let startAsyncCore () : Async<ConnectedState> = async { | |
// TODO: implement actual connection logic here. | |
let connectedState = { Id = 42 } | |
return connectedState | |
} | |
let stopAsyncCore (connectedState: ConnectedState) : Async<unit> = async { | |
// TODO: implement actual disconnection logic here. | |
return () | |
} | |
let agent = MailboxProcessor<ConnectionCommand>.Start(fun mb -> | |
// The body of the agent is a simple loop that receives and processes commands, implemented as a recursive function. | |
let rec loop (status: ConnectionStatus) = async { | |
match! mb.Receive() with | |
// We received the order to start. We can only do that if we were not running. | |
| Start chan -> | |
match status with | |
| NotRunning -> | |
// Bingo, we can start! Do it on a separate thread using Async.Start, | |
// so that commands can keep being processed in the meantime. | |
Async.Start(async { | |
let! connectedState = startAsyncCore () | |
// Once we're connected, we post Started back to ourselves, to update the state and notify callbacks. | |
mb.Post(Started connectedState) | |
}) | |
// Continue the loop, with Starting as the state. | |
return! loop (Starting [chan.Reply]) | |
| Starting callbacks -> | |
// We're already starting; just add this channel to get notified when that's finished. | |
return! loop (Starting (chan.Reply :: callbacks)) | |
| Running _ -> | |
// Already running, nothing to do. | |
chan.Reply(true) | |
return! loop status | |
| Stopping _ -> | |
// We're not in a state where we can start a connection, so we just reply "nope" and loop with the same state. | |
chan.Reply(false) | |
return! loop status | |
// This message was sent from the Async.Start above. At this point the state can only be Starting. | |
| Started connectedState -> | |
match status with | |
| Starting callbacks -> | |
// Notify the caller (and other callers who may have called StartAsync in the meantime). | |
for callback in callbacks do | |
callback true | |
| _ -> | |
// Impossible. | |
() | |
// Update the state accordingly. | |
return! loop (Running connectedState) | |
// This is pretty much the same as processing Start. | |
| Stop chan -> | |
match status with | |
| Running connectedState -> | |
Async.Start(async { | |
do! stopAsyncCore connectedState | |
mb.Post(Stopped) | |
}) | |
return! loop (Stopping [chan.Reply]) | |
| Stopping callbacks -> | |
return! loop (Stopping (chan.Reply :: callbacks)) | |
| NotRunning -> | |
chan.Reply(true) | |
return! loop status | |
| Starting _ -> | |
chan.Reply(false) | |
return! loop status | |
// This is pretty much the same as processing Started. The state can only be Stopping. | |
| Stopped -> | |
match status with | |
| Stopping callbacks -> | |
for callback in callbacks do | |
callback true | |
| _ -> () | |
return! loop NotRunning | |
| Dispose chan -> | |
chan.Reply() | |
// Don't call loop, we want to exit here. | |
} | |
// We defined the loop, now we start it with its initial state. | |
loop NotRunning | |
) | |
/// <summary> | |
/// Try to start asynchronously. | |
/// </summary> | |
/// <returns>Whether the start was successful.</returns> | |
member this.StartAsync() : Async<bool> = | |
// This sends the Start command and asynchronously waits for the channel to receive a reply. | |
agent.PostAndAsyncReply(Start) | |
/// <summary> | |
/// Try to stop asynchronously. | |
/// </summary> | |
/// <returns>Whether the stop was successful.</returns> | |
member this.StopAsync() : Async<bool> = | |
// This sends the Stop command and asynchronously waits for the channel to receive a reply. | |
agent.PostAndAsyncReply(Stop) | |
interface System.IDisposable with | |
member this.Dispose() = | |
agent.PostAndReply(Dispose) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment