Last active September 1, 2016 17:27
Hopac Supervisor using Proc (Based on and incorporated into Hopac.Plus (
// Source:
namespace Hopac.Plus.Supervision
open Hopac
open Hopac.Infixes
open Hopac.Plus
open Hopac.Plus.Extensions
type Policy =
| Restart
| Terminate
| Delayed of restartDelay:Alt<unit>
type SignalAck = SignalAck of IVar<unit>
module SignalAck =
let ack (SignalAck uI) : Job<unit> =
uI *<= ()
type OneTimeSignal = OneTimeSignal of IVar<IVar<unit>>
module OneTimeSignal =
let awaitSignal (OneTimeSignal uIC) : Alt<SignalAck> =
asAlt ^ uIC
|> Alt.afterFun ^ SignalAck
type OneTimeSignalSource = OneTimeSignalSource of IVar<IVar<unit>>
module OneTimeSignalSource =
let create () : OneTimeSignalSource =
OneTimeSignalSource ^ IVar ()
let trigger (OneTimeSignalSource uIC) : Job<unit> =
uIC *<= IVar ()
let triggerAndAwaitAck (OneTimeSignalSource uIC) : Alt<unit> =
Alt.withNackJob ^ fun nack ->
[ nack ^=>. Alt.never uIC
let signal (OneTimeSignalSource uIC) : OneTimeSignal =
OneTimeSignal uIC
type WillLocker = WillLocker of MVar<obj option>
module WillLocker =
let create initial : WillLocker =
WillLocker ^ MVar ^ Some ^ box initial
let createEmpty () : WillLocker =
WillLocker ^ MVar None
let updateWill (WillLocker aM) (a:'a) : Job<unit> =
asJob ^ MVar.mutateFun (always ^ Some ^ box a) aM
let getLastWill (WillLocker aM) : Job<'a option> = unbox ^ aM
type JobInit = OneTimeSignal -> WillLocker -> Job<unit>
[<StructuredFormatDisplay("JobHandle {Value}")>]
type JobHandle =
JobHandle of string with
override x.ToString() =
match x with JobHandle str -> str
member x.Value = string x
module JobHandle =
let create str = JobHandle str
let createAnonymous () = JobHandle ^ string ^ System.Guid.NewGuid()
type MinionInfo =
handle : JobHandle
policy : Policy
job : JobInit
type MinionState =
info : MinionInfo
locker : WillLocker
shutdown : OneTimeSignalSource
[<StructuredFormatDisplay("JobId {Value}")>]
type JobId =
JobId of uint64 with
override x.ToString() =
match x with JobId str -> string str
member x.Value = string x
type SupervisorState =
ident : uint64
minions : Map<JobId, MinionState>
processes : Map<JobId, Alt<JobId>>
delayed : Map<JobHandle, Alt<JobHandle * (SupervisorState -> Job<SupervisorState>)>>
version : uint64
module SupervisorState =
let initial =
{ ident = 0UL
minions = Map.empty
processes = Map.empty
delayed = Map.empty
version = 0UL }
let removeMinion jobId state =
let minionStateO = state.minions |> Map.tryFind jobId
{ state with
version = state.version + 1UL
processes = Map.remove jobId state.processes
minions = Map.remove jobId state.minions
delayed = match minionStateO with Some { info = { handle = jh } } -> Map.remove jh state.delayed | None -> state.delayed }
let addMinion jobId minionState (p : Proc) state =
{ state with
version = state.version + 1UL
ident = state.ident + 1UL
processes =
Map.add jobId (p ^-> (fun () -> jobId)) state.processes
minions =
Map.add jobId minionState state.minions }
let addDelayed handle promise state =
{ state with
version = state.version + 1UL
delayed = Map.add handle promise state.delayed }
let removeDelayed handle state =
{ state with
version = state.version + 1UL
delayed = Map.remove handle state.delayed }
let jobNames state =
let running =
|> Map.toList
|> (fun (_, { info = { handle = h } }) -> h)
let delayed =
|> Map.toList
|> fst
List.concat [running;delayed]
let jobState state handle =
|> Map.tryPick ^ fun jid s -> if = handle then Some (jid,s) else None
type Supervisor =
register : Ch<MinionInfo>
unregister : Ch<JobHandle>
module Supervisor =
let startMinion logger minionInfo willLocker state =
logger <| sprintf "Starting minion %O" minionInfo.handle
if SupervisorState.jobNames state |> List.contains minionInfo.handle then
logger <| sprintf "New minion not started; %O already supervised" minionInfo.handle
Job.result state
let jobId = JobId state.ident
let minionState =
{ info = minionInfo
locker = willLocker |> Option.orDefault ^ WillLocker.createEmpty ()
shutdown = OneTimeSignalSource.create () }
Proc.start (minionInfo.job (OneTimeSignalSource.signal minionState.shutdown)
>>- fun p ->
logger <| sprintf "Minion %O (%A) started" minionInfo.handle jobId
SupervisorState.addMinion jobId minionState p state
let unregisterMinion logger state handle =
logger <| sprintf "Unregistering %A started" handle
match SupervisorState.jobState state handle with
| Some (jobId, minionState) ->
logger <| sprintf "Shutting down %A (%A)" handle jobId
OneTimeSignalSource.triggerAndAwaitAck minionState.shutdown
>>-. SupervisorState.removeMinion jobId state
| None ->
logger <| sprintf "Received request to unregister unknown job %A" handle
Job.result (SupervisorState.removeDelayed handle state)
let handleTermination logger state jobId =
let minionState = Map.find jobId state.minions
match with
| Terminate ->
logger <| sprintf "%A (%A) terminated; removing from supervision" jobId
Job.result ^ SupervisorState.removeMinion jobId state
| Restart ->
logger <| sprintf "%A (%A) terminated; restarting" jobId
SupervisorState.removeMinion jobId state
|> startMinion logger (Some
| Delayed delay ->
logger <| sprintf "%A (%A) terminated; restarting in %A" jobId delay
let promise =
>>-. (, startMinion logger (Some
|> memo
|> SupervisorState.removeMinion jobId
|> SupervisorState.addDelayed ( promise)
|> Job.result
let executeShutdown logger state ack =
let shutdownMinion (minionState : MinionState) =
OneTimeSignalSource.triggerAndAwaitAck minionState.shutdown
let shutdownAll =
Job.seqIgnore (state.minions |> Map.toSeq |> (snd >> shutdownMinion))
|> (fun () -> logger "All minions shutdown")
|> memo
logger "Shutting down minions!"
Alt.choose [
shutdownAll |>
timeOutMillis 1000 |> Alt.afterFun (fun () -> logger "Minion shutdown timed out without all minions shutting down cleanly")
] >>=. SignalAck.ack ack
let create shutdown logger =
let registerCh = Ch()
let unregisterCh = Ch()
let rec loop state =
// Sanity check; should fail hard if these fail...
// These should be removed after testing.
let processIds = state.processes |> Map.toList |> fst
if processIds <> (state.minions |> Map.toList |> fst) then
failwithf "Unmatched process and minion maps\nprocesses: %A\n minions: %A"
(state.minions |> Map.toList |> fst)
if processIds <> List.distinct processIds then
failwithf "duplicate process ids?"
let minionNames = state.minions |> Map.toList |> (snd >> fun ms ->
if minionNames <> List.distinct minionNames then
failwithf "duplicate minion handles detected\n%A" minionNames
logger <| "Current state version: " + string state.version
Alt.choose [
// shutdown
OneTimeSignal.awaitSignal shutdown ^=> executeShutdown logger state
// anything else will create a new state and then recurse into the loop
Alt.choose [
// process delayed restarts
|> Map.toSeq
|> snd
|> Alt.choose
|> Alt.afterJob
(fun (delayName, restart) ->
|> SupervisorState.removeDelayed delayName
|> restart)
// register new minion
registerCh ^=>
fun minionInfo ->
startMinion logger minionInfo None state
// unregister minion
unregisterCh ^=> unregisterMinion logger state
// handle termination
|> Map.toSeq
|> snd
|> Alt.choose
|> Alt.afterJob ^ handleTermination logger state
] |> Alt.afterJob loop
Job.start ^ loop SupervisorState.initial
>>-. { register = registerCh; unregister = unregisterCh }
let start s h p init =
s.register *<- { handle = h; policy = p; job = init }
let stop s h =
s.unregister *<- h
// Source:
(*** hide ***)
#I "../../bin/Hopac.Plus"
Supervision example
(*** do-not-eval ***)
#r "Hopac.Core"
#r "Hopac"
#r "Hopac.Platform"
#r "Hopac.Plus"
open Hopac
open Hopac.Infixes
open Hopac.Plus.Supervision
open Hopac.Plus.Extensions
let testMinion name failIf shutdown locker : Job<unit> =
let rec loop state =
Alt.choose [
OneTimeSignal.awaitSignal shutdown ^=> SignalAck.ack
timeOutMillis 10
|> Alt.afterFun (fun () -> if failIf state then failwith "boom" else printfn "test [%O - %d]" name state)
|> Alt.afterJob (fun () -> let newState = state + 1 in WillLocker.updateWill locker newState >>=. loop newState)
] |> asJob
WillLocker.getLastWill locker
|> OptionJob.bindJob loop
|> OptionJob.orDefaultJob (loop 0)
let shutdown = OneTimeSignalSource.create ()
let sup = run <| Supervisor.create (OneTimeSignalSource.signal shutdown) (printfn "%s")
let rand = System.Random()
let job1 = JobHandle.createAnonymous ()
let job2 = JobHandle.createAnonymous ()
queue <| Supervisor.start sup job1 Restart (testMinion "test1" (fun _ -> false))
queue <| Supervisor.start sup job2 (Delayed <| timeOutMillis 20) (testMinion "delayedTest" (fun _ -> rand.Next(0, 2) = 1))
queue <| Supervisor.stop sup job1
queue <| Supervisor.stop sup job2
queue <| (timeOutMillis 1000 >>=. OneTimeSignalSource.triggerAndAwaitAck shutdown >>- fun () -> printfn "All done...")
