Created
April 4, 2018 22:49
-
-
Save mwolicki/6b73c6f57578fe6499cd8c30e48b672f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type ISupervisedMailboxProcessor = | |
abstract Restart : unit -> unit | |
abstract AddChild : ISupervisedMailboxProcessor -> unit | |
type 'a Message = | |
| Message of 'a | |
| Restart | |
| AddActor of ISupervisedMailboxProcessor | |
exception RestartException | |
type SupervisedMailboxProcessor<'a> (f:SupervisedMailboxProcessor<'a> -> Async<unit>, ?supervisor:ISupervisedMailboxProcessor) as this = | |
let dependingActors = ResizeArray<ISupervisedMailboxProcessor> () | |
let mailbox = new MailboxProcessor<'a Message>(fun _ -> | |
let restart () = | |
supervisor |> Option.iter (fun x->x.Restart ()) | |
for a in dependingActors do a.Restart () | |
let rec loop state = async { | |
let! state = async { | |
try | |
do! f this | |
return state | |
with | |
| :? RestartException -> | |
restart () | |
return None | |
| ex -> | |
printfn "%A" ex | |
let exState = ex.GetType () |> Some | |
if exState = state then | |
restart () | |
return None | |
else | |
return exState } | |
return! loop state } | |
loop None) | |
do | |
supervisor |> Option.iter (fun x->x.AddChild this) | |
member private __.Handle m = | |
match m with | |
| Message _ -> failwithf "Impossible %A" m | |
| Restart -> raise RestartException | |
| AddActor a -> dependingActors.Add a | |
member __.Start () = mailbox.Start () | |
member __.CurrentQueueLength = mailbox.CurrentQueueLength | |
member __.Post msg = mailbox.Post (Message msg) | |
member s.TryReceive timeout = async { | |
let! msg = mailbox.TryReceive timeout | |
match msg with | |
| None -> return None | |
| Some (Message x) -> return Some x | |
| Some x -> | |
s.Handle x | |
return! s.TryReceive timeout } | |
member s.Receive () : Async<'a> = async { | |
let! msg = mailbox.Receive () | |
match msg with | |
| Message m -> return m | |
| x -> | |
s.Handle x | |
return! s.Receive () } | |
member private __.Restart () = mailbox.Post Restart | |
member private __.AddChild child = mailbox.Post (AddActor child) | |
interface ISupervisedMailboxProcessor with | |
override __.Restart () = this.Restart () | |
override __.AddChild child = this.AddChild child |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment