Skip to content

Instantly share code, notes, and snippets.

@bent-rasmussen
Last active September 12, 2023 04:23
Show Gist options
  • Save bent-rasmussen/8152df3c842a06acabe219dd877802d5 to your computer and use it in GitHub Desktop.
Save bent-rasmussen/8152df3c842a06acabe219dd877802d5 to your computer and use it in GitHub Desktop.
Experimental high performance actor implementation for F# using channel and task computation expression.
// NOTE: import this Nuget package: TaskBuilder.fs (written using 2.1.0)
//
// Tested in LINQPad (hence Dump method usage).
open System
open System.Collections
open System.Collections.Generic
open System.Diagnostics
open System.Linq
open System.Threading
open System.Threading.Tasks
open System.Threading.Channels
open FSharp.Control.Tasks // Nuget package: TaskBuilder.fs 2.1.0
// Rough sketch of a high-performance actor; potential alternative to
// MailboxProcessor, built-in F# implementation. This actor implementation
// uses System.Threading.Channel.Channel and TaskBuilder to achive its
// performance.
// TODO testing
// TODO more features
// TODO lost letters
// TODO unhandled exceptions
// TODO exception handling exceptions
/// Require actor message types to have a general excepton handler;
/// for message cases that have a TaskCompletionSource, the impl.
/// can use SetException on that; this allows exceptions to be caught
/// without necessarily having to use try-catch blocks for each
/// message case.
[<RequireQualifiedAccess>]
type IActorMessage =
// If the handler throws an exception, then this can be propagated back to the caller if
// it has a callback mechanism; otherwise it is lost.
// TODO "lost letters" outbox for unhandled top-level exceptions.
abstract member TryHandleException : Exception -> bool
/// The states that an actor can be in.
[<RequireQualifiedAccess>]
type ActorState =
/// New state; actor not yet started.
| New
/// Actor is now running and accepting messages.
| Started
/// Actor is stopping; it will not accept new messages
/// but it will process all pending messages queued up.
| StoppingGracefully
/// Actor has stopped and is in a terminal state.
| Stopped
/// Actor.
type IActor<'Msg when 'Msg :> IActorMessage> =
// Queries
/// Lifecycle state.
abstract member State : ActorState
// Commands (Lifecycle)
// Start the actor.
abstract member Start : unit -> unit
// Process all queued up messages and then stop the actor.
// TODO Task
abstract member StopGracefully : unit -> unit
// Stop the actor immediately.
abstract member Stop : unit -> unit
// Commands (Communication)
// Fire and forget.
abstract member Tell : 'Msg -> unit
// Ask and reply synchronously.
abstract member Ask : (TaskCompletionSource<'Reply> -> 'Msg) -> 'Reply
// Ask and reply asynchronously.
abstract member AskAsync : (TaskCompletionSource<'Reply> -> 'Msg) -> Task<'Reply>
/// Actor message handler.
and ActorMessageHandler<'Msg when 'Msg :> IActorMessage> =
IActor<'Msg> * 'Msg -> Task<unit>
/// Actor implementation.
and Actor<'Msg when 'Msg :> IActorMessage>
private
(inbox : Channel<'Msg>,
messageHandler : ActorMessageHandler<'Msg>)
as self =
let gate = new Object()
let cts = new CancellationTokenSource()
let mutable state = ActorState.New
let mutable worker = None
let assertStarted () =
match state with
| ActorState.Started ->
()
| ActorState.New -> failwith "Actor not started."
| ActorState.Stopped -> failwith "Actor stopped."
| ActorState.StoppingGracefully -> failwith "Actor stopping."
let work (cancellationToken : CancellationToken) =
task {
state <- ActorState.Started
try
while not cancellationToken.IsCancellationRequested do
let! ok = inbox.Reader.WaitToReadAsync(cancellationToken)
let mutable read = ok
while read do
let (ok, msg) = inbox.Reader.TryRead()
read <- ok
if ok then
try
do! messageHandler(self, msg)
with messageHandlerEx ->
try
if not (msg.TryHandleException(messageHandlerEx)) then
() // TODO internal error propagation
with exceptionHandlerEx ->
() // TODO internal error propagation
finally
state <- ActorState.Stopped
}
static member NewUnbounded (handler : ActorMessageHandler<'Msg>) : IActor<'Msg> =
let ch = Channel.CreateUnbounded<'Msg>()
new Actor<'Msg>(ch, handler) :> IActor<'Msg>
interface IActor<'Msg> with
member this.State : ActorState =
state
member this.Start () =
if worker.IsNone then
lock gate (fun () ->
if worker.IsNone then
worker <- Some (work(cts.Token))
)
member this.StopGracefully () =
lock gate (fun () ->
if state <> ActorState.Started && state <> ActorState.StoppingGracefully then
failwith "Actor must be started."
state <- ActorState.StoppingGracefully
inbox.Writer.Complete()
)
member this.Stop () =
lock gate (fun () ->
match state with
| ActorState.New ->
failwith "Actor must have started first."
| ActorState.Started
| ActorState.StoppingGracefully ->
if not (inbox.Writer.TryComplete()) then
failwith "Error stopping actor (unbounded channel completioon failure)."
//(inbox :> IDisposable).Dispose()
cts.Cancel()
| ActorState.Stopped ->
() // Idempotent
)
member this.Tell (message : 'Msg) : unit =
assertStarted()
if not (inbox.Writer.TryWrite(message)) then
failwith "Unexpected write failure (unbounded channel write failure)."
member this.Ask (messageBuilder : TaskCompletionSource<'Reply> -> 'Msg) : 'Reply =
(this :> IActor<_>).AskAsync(messageBuilder).GetAwaiter().GetResult()
member this.AskAsync (messageBuilder : TaskCompletionSource<'Reply> -> 'Msg) : Task<'Reply> =
assertStarted()
let completion = new TaskCompletionSource<'Reply>()
let message = messageBuilder completion
(this :> IActor<_>).Tell(message)
completion.Task
type TestMesssage =
| Hello of string
| Not of bool * TaskCompletionSource<bool>
| Incr
| Counter of TaskCompletionSource<int>
interface IActorMessage with
member this.TryHandleException (error : exn) =
match this with
| Not (_, reply) -> reply.TrySetException(error)
| _ -> false
let testActor =
let rnd = new Random()
let mutable counter = 0
Actor.NewUnbounded(fun (actor, msg) ->
task {
match msg with
| Hello str -> str.Dump("Hello")
| Not (b, reply) ->
let result = not b
if rnd.Next(0, 5) > 0 then
reply.SetResult(result)
else
reply.SetResult(result)
//actor.Stop() // stop the actor by just calling stop on itself
//failwith "oops, unexpected error"
| Incr ->
counter <- counter + 1
| Counter reply ->
reply.SetResult(counter)
}
)
// Start the actor
testActor.Start()
// Fire and forget synchronous post
testActor.Tell(Hello "world")
// Asynchronous ask (post and async reply)
testActor.Ask(fun reply -> Not (true, reply)).Dump("Ask(Not true)")
testActor.Ask(fun reply -> Not (false, reply)).Dump("Ask(Not false)")
// Using task computaton expression
let job = task {
let! notTrue = testActor.AskAsync(fun tcs -> Not (true, tcs))
let! notFalse = testActor.AskAsync(fun tcs -> Not (false, tcs))
notTrue.Dump("AskAsync(Not true)")
notFalse.Dump("AskAsync(Not false)")
}
job.GetAwaiter().GetResult() // force synchronous wait
// Performance test using 1,000,000 messages
let clock = Stopwatch.StartNew()
for i = 1 to 1_000_000 do
testActor.Tell(Incr)
let counter = testActor.Ask(Counter)
counter.Dump("Counter")
clock.Elapsed.TotalSeconds.Dump("Seconds elapsed")
// Performance test using 1,000,000 messages with reply
let notting = task {
let clock = Stopwatch.StartNew()
let mutable b = false
for i = 1 to 1_000_000 do
let! b' = testActor.AskAsync(fun reply -> Not (b, reply))
b <- b'
b.Dump("b")
clock.Elapsed.TotalSeconds.Dump("Seconds elapsed")
}
notting.GetAwaiter().GetResult()
// Now stop it
testActor.Stop()
// ...and see that we cannot accept more messages.
//testActor.Tell(Hello "world again")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment