Skip to content

Instantly share code, notes, and snippets.

@mwolicki
Created April 4, 2018 22:49
Show Gist options
  • Save mwolicki/6b73c6f57578fe6499cd8c30e48b672f to your computer and use it in GitHub Desktop.
Save mwolicki/6b73c6f57578fe6499cd8c30e48b672f to your computer and use it in GitHub Desktop.
type ISupervisedMailboxProcessor =
abstract Restart : unit -> unit
abstract AddChild : ISupervisedMailboxProcessor -> unit
type 'a Message =
| Message of 'a
| Restart
| AddActor of ISupervisedMailboxProcessor
exception RestartException
type SupervisedMailboxProcessor<'a> (f:SupervisedMailboxProcessor<'a> -> Async<unit>, ?supervisor:ISupervisedMailboxProcessor) as this =
let dependingActors = ResizeArray<ISupervisedMailboxProcessor> ()
let mailbox = new MailboxProcessor<'a Message>(fun _ ->
let restart () =
supervisor |> Option.iter (fun x->x.Restart ())
for a in dependingActors do a.Restart ()
let rec loop state = async {
let! state = async {
try
do! f this
return state
with
| :? RestartException ->
restart ()
return None
| ex ->
printfn "%A" ex
let exState = ex.GetType () |> Some
if exState = state then
restart ()
return None
else
return exState }
return! loop state }
loop None)
do
supervisor |> Option.iter (fun x->x.AddChild this)
member private __.Handle m =
match m with
| Message _ -> failwithf "Impossible %A" m
| Restart -> raise RestartException
| AddActor a -> dependingActors.Add a
member __.Start () = mailbox.Start ()
member __.CurrentQueueLength = mailbox.CurrentQueueLength
member __.Post msg = mailbox.Post (Message msg)
member s.TryReceive timeout = async {
let! msg = mailbox.TryReceive timeout
match msg with
| None -> return None
| Some (Message x) -> return Some x
| Some x ->
s.Handle x
return! s.TryReceive timeout }
member s.Receive () : Async<'a> = async {
let! msg = mailbox.Receive ()
match msg with
| Message m -> return m
| x ->
s.Handle x
return! s.Receive () }
member private __.Restart () = mailbox.Post Restart
member private __.AddChild child = mailbox.Post (AddActor child)
interface ISupervisedMailboxProcessor with
override __.Restart () = this.Restart ()
override __.AddChild child = this.AddChild child
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment