Skip to content

Instantly share code, notes, and snippets.

@Tarmil
Last active July 15, 2020 13:54
// A reply to: https://twitter.com/davidfowl/status/1282445306261471232
// This implements a connection of some kind that can be started and stopped asynchronously.
// We must ensure that we don't try to start it while it is stopping or vice-versa.
// A few internal types first...
/// Contains whatever state needs to be held on to while connected.
type private ConnectedState = { Id: int }
/// The state of the internal agent. Describes the current status of the connection,
/// and the callbacks that have been registered for when the status changes.
type private ConnectionStatus =
| NotRunning
| Starting of onStarted: list<bool -> unit>
| Running of connectedState: ConnectedState
| Stopping of onStopped: list<bool -> unit>
/// Commands and state transitions for the internal agent.
/// AsyncReplyChannel<T> is an object to which we Reply() with a value of type T once the command is done.
type private ConnectionCommand =
| Start of chan: AsyncReplyChannel<bool>
| Started of connectedState: ConnectedState
| Stop of chan: AsyncReplyChannel<bool>
| Stopped
| Dispose of chan: AsyncReplyChannel<unit>
// And then the main Connection.
type Connection() =
let startAsyncCore () : Async<ConnectedState> = async {
// TODO: implement actual connection logic here.
let connectedState = { Id = 42 }
return connectedState
}
let stopAsyncCore (connectedState: ConnectedState) : Async<unit> = async {
// TODO: implement actual disconnection logic here.
return ()
}
let agent = MailboxProcessor<ConnectionCommand>.Start(fun mb ->
// The body of the agent is a simple loop that receives and processes commands, implemented as a recursive function.
let rec loop (status: ConnectionStatus) = async {
match! mb.Receive() with
// We received the order to start. We can only do that if we were not running.
| Start chan ->
match status with
| NotRunning ->
// Bingo, we can start! Do it on a separate thread using Async.Start,
// so that commands can keep being processed in the meantime.
Async.Start(async {
let! connectedState = startAsyncCore ()
// Once we're connected, we post Started back to ourselves, to update the state and notify callbacks.
mb.Post(Started connectedState)
})
// Continue the loop, with Starting as the state.
return! loop (Starting [chan.Reply])
| Starting callbacks ->
// We're already starting; just add this channel to get notified when that's finished.
return! loop (Starting (chan.Reply :: callbacks))
| Running _ ->
// Already running, nothing to do.
chan.Reply(true)
return! loop status
| Stopping _ ->
// We're not in a state where we can start a connection, so we just reply "nope" and loop with the same state.
chan.Reply(false)
return! loop status
// This message was sent from the Async.Start above. At this point the state can only be Starting.
| Started connectedState ->
match status with
| Starting callbacks ->
// Notify the caller (and other callers who may have called StartAsync in the meantime).
for callback in callbacks do
callback true
| _ ->
// Impossible.
()
// Update the state accordingly.
return! loop (Running connectedState)
// This is pretty much the same as processing Start.
| Stop chan ->
match status with
| Running connectedState ->
Async.Start(async {
do! stopAsyncCore connectedState
mb.Post(Stopped)
})
return! loop (Stopping [chan.Reply])
| Stopping callbacks ->
return! loop (Stopping (chan.Reply :: callbacks))
| NotRunning ->
chan.Reply(true)
return! loop status
| Starting _ ->
chan.Reply(false)
return! loop status
// This is pretty much the same as processing Started. The state can only be Stopping.
| Stopped ->
match status with
| Stopping callbacks ->
for callback in callbacks do
callback true
| _ -> ()
return! loop NotRunning
| Dispose chan ->
chan.Reply()
// Don't call loop, we want to exit here.
}
// We defined the loop, now we start it with its initial state.
loop NotRunning
)
/// <summary>
/// Try to start asynchronously.
/// </summary>
/// <returns>Whether the start was successful.</returns>
member this.StartAsync() : Async<bool> =
// This sends the Start command and asynchronously waits for the channel to receive a reply.
agent.PostAndAsyncReply(Start)
/// <summary>
/// Try to stop asynchronously.
/// </summary>
/// <returns>Whether the stop was successful.</returns>
member this.StopAsync() : Async<bool> =
// This sends the Stop command and asynchronously waits for the channel to receive a reply.
agent.PostAndAsyncReply(Stop)
interface System.IDisposable with
member this.Dispose() =
agent.PostAndReply(Dispose)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment