-
-
Save mavnn/57e39ce97c7de1aa3f24da77478412fc to your computer and use it in GitHub Desktop.
#!/usr/bin/env fsharpi --exec | |
#I "packages/Hopac/lib/net45" | |
#r "Hopac.Core.dll" | |
#r "Hopac.dll" | |
#r "Hopac.Platform.dll" | |
open System | |
open Hopac | |
open Hopac.Infixes | |
type Policy = | |
| Restart | |
| Terminate | |
| Delayed of TimeSpan | |
type MinionInfo = | |
{ | |
name : string | |
policy : Policy | |
job : Ch<IVar<unit>> -> (obj -> Job<unit>) -> obj option -> Job<unit> | |
} | |
type MinionState = | |
{ | |
info : MinionInfo | |
state : obj option | |
shutdown : Ch<IVar<unit>> | |
} | |
type JobId = private JobId of int | |
type private SupervisorState = | |
{ | |
ident : int | |
minions : Map<JobId, MinionState> | |
processes : Map<JobId, Alt<JobId>> | |
delayed : Map<string, Alt<string * (SupervisorState -> Job<SupervisorState>)>> | |
} | |
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] | |
module private SupervisorState = | |
let removeMinion jobId state = | |
{ state with | |
processes = Map.remove jobId state.processes | |
minions = Map.remove jobId state.minions } | |
let addMinion jobId minionState (p : Proc) will state = | |
{ state with | |
ident = state.ident + 1 | |
processes = | |
Map.add jobId (p ^-> (fun () -> jobId)) state.processes | |
minions = | |
Map.add jobId minionState state.minions } | |
let addDelayed name promise state = | |
{ state with | |
delayed = Map.add name promise state.delayed } | |
let removeDelayed name state = | |
{ state with | |
delayed = Map.remove name state.delayed } | |
let updateWill jobId will state = | |
{ state with | |
minions = Map.map (fun k v -> if k = jobId then { v with state = Some will } else v) state.minions } | |
let jobNames state = | |
let running = | |
state.minions | |
|> Map.toList | |
|> List.map (fun (_, { info = { name = n } }) -> n) | |
let delayed = | |
state.delayed | |
|> Map.toList | |
|> List.map fst | |
List.concat [running;delayed] | |
let jobState state name = | |
state.minions | |
|> Map.toSeq | |
|> Seq.tryFind (fun (jobId, minionState) -> minionState.info.name = name) | |
type Supervisor = | |
{ | |
shutdown : Ch<IVar<unit>> | |
register : Ch<MinionInfo> | |
unregister : Ch<string> | |
} | |
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] | |
module Supervisor = | |
let create logger = | |
let shutdownCh = Ch() | |
let registerCh = Ch() | |
let unregisterCh = Ch() | |
let lastWillCh = Ch() | |
let startMinion minionInfo will state = | |
logger <| sprintf "Starting minion %A" minionInfo.name | |
if SupervisorState.jobNames state |> List.contains minionInfo.name then | |
logger <| sprintf "New minion not started; %A already supervised" minionInfo.name | |
Job.result state | |
else | |
let jobId = JobId state.ident | |
let minionShutdown = Ch() | |
let minionState = { info = minionInfo; state = will; shutdown = minionShutdown } | |
Proc.start (minionInfo.job minionShutdown (fun o -> Ch.send lastWillCh (jobId, o)) will) | |
>>- fun p -> | |
logger <| sprintf "Minion %A (%A) started" minionInfo.name jobId | |
SupervisorState.addMinion jobId minionState p will state | |
let unregisterMinion name state = | |
logger <| sprintf "Unregistering %A started" name | |
match SupervisorState.jobState state name with | |
| Some (jobId, minionState) -> | |
logger <| sprintf "Shutting down %A (%A)" name jobId | |
minionState.shutdown *<-=>- id | |
>>-. SupervisorState.removeMinion jobId state | |
| None -> | |
logger <| sprintf "Received request to unregister unknown job %A" name | |
Job.result state | |
let handleTermination state jobId = | |
let minionState = Map.find jobId state.minions | |
match minionState.info.policy with | |
| Terminate -> | |
logger <| sprintf "%A (%A) terminated; removing from supervision" minionState.info.name jobId | |
SupervisorState.removeMinion jobId state | |
|> Job.result | |
| Restart -> | |
logger <| sprintf "%A (%A) terminated; restarting" minionState.info.name jobId | |
SupervisorState.removeMinion jobId state | |
|> startMinion minionState.info minionState.state | |
| Delayed delay -> | |
logger <| sprintf "%A (%A) terminated; restarting in %A" minionState.info.name jobId delay | |
let promise = | |
timeOut delay | |
>>-. (minionState.info.name, startMinion minionState.info minionState.state) | |
|> memo | |
state | |
|> SupervisorState.removeMinion jobId | |
|> SupervisorState.addDelayed minionState.info.name (Promise.read promise) | |
|> Job.result | |
let replaceLastWill state (jobId, will) = | |
logger <| sprintf "(%A) sent new will" jobId | |
SupervisorState.updateWill jobId will state | |
|> Job.result | |
let rec loop state = | |
// Sanity check; should fail hard if these fail... | |
// These should be removed after testing. | |
let processIds = state.processes |> Map.toList |> List.map fst | |
if processIds <> (state.minions |> Map.toList |> List.map fst) then | |
failwithf "Unmatched process and minion maps\nprocesses: %A\n minions: %A" | |
processIds | |
(state.minions |> Map.toList |> List.map fst) | |
if processIds <> List.distinct processIds then | |
failwithf "duplicate process ids?" | |
let minionNames = state.minions |> Map.toList |> List.map snd |> List.map (fun ms -> ms.info.name) | |
if minionNames <> List.distinct minionNames then | |
failwithf "duplicate minion names detected\n%A" minionNames | |
Alt.choose [ | |
// shutdown | |
shutdownCh ^=> | |
fun ack -> | |
let shutdownMinion (minionState : MinionState) = | |
minionState.shutdown *<-=>- id | |
let shutdownAll = | |
Job.seqIgnore (state.minions |> Map.toSeq |> Seq.map (snd >> shutdownMinion)) | |
|> Job.map (fun () -> logger "All minions shutdown") | |
|> memo | |
logger "Shutting down minions!" | |
Alt.choose [ | |
shutdownAll |> Promise.read | |
timeOutMillis 1000 |> Alt.afterFun (fun () -> logger "Minion shutdown timed out without all minions shutting down cleanly") | |
] >>=. ack *<= () | |
// anything else will create a new state and then recurse into the loop | |
Alt.choose [ | |
// process delayed restarts | |
state.delayed | |
|> Map.toSeq | |
|> Seq.map snd | |
|> Alt.choose | |
|> Alt.afterJob | |
(fun (delayName, restart) -> | |
state | |
|> SupervisorState.removeDelayed delayName | |
|> restart) | |
// register new minion | |
registerCh ^=> | |
fun minionInfo -> | |
startMinion minionInfo None state | |
// unregister minion | |
unregisterCh ^=> | |
fun name -> unregisterMinion name state | |
// new last will | |
lastWillCh ^=> replaceLastWill state | |
// handle termination | |
state.processes | |
|> Map.toSeq | |
|> Seq.map snd | |
|> Alt.choose | |
|> Alt.afterJob (fun jid -> handleTermination state jid) | |
] |> Alt.afterJob loop | |
] | |
loop { ident = 0; minions = Map.empty; processes = Map.empty; delayed = Map.empty } | |
|> start | |
{ | |
shutdown = shutdownCh | |
register = registerCh | |
unregister = unregisterCh | |
} | |
// interactive testing... | |
let testMinion name failIf shutdown (sendWill : obj -> Job<unit>) (lastWill : obj option) : Job<unit> = | |
let rec loop state = | |
Alt.choose [ | |
shutdown ^=> fun ack -> ack *<= () | |
timeOutMillis 500 | |
|> Alt.afterFun (fun () -> if failIf state then failwith "boom" else printfn "teeeeeeeest minion! [%s - %d]" name state) | |
|> Alt.afterJob (fun () -> sendWill <| state + 1) | |
|> Alt.afterJob (fun () -> loop <| state + 1) | |
] :> Job<unit> | |
match lastWill with | |
| None -> loop 0 | |
| Some will -> | |
match will with | |
| :? int -> loop (will :?> int) | |
| _ -> loop 0 | |
let sup = Supervisor.create (printfn "%s") | |
let rand = System.Random() | |
let test1 = { name = "test1"; policy = Restart; job = testMinion "test1" (fun _ -> false) } | |
let test2 = { name = "delayedTest"; policy = Delayed <| TimeSpan.FromSeconds 1.; job = testMinion "delayedTest" (fun _ -> rand.Next(0, 2) = 1) } | |
sup.register *<+ test1 |> start | |
sup.register *<+ test2 |> start | |
sup.unregister *<- "test1" |> run | |
sup.unregister *<- "delayedTest" |> run | |
sup.shutdown *<-=>- id |> run |
Didn't yet look at the code, but I'm thinking of adding details on the reason for termination into the Proc
mechanism. Something like this:
type Reason =
| Raised of exn
| RanToCompletion
| BecameGarbage
It would be possible to add the final value returned by the job (boxed as an obj
or even have a type variable in Proc
), but I believe that would be a misfeature. If you want a result from the job, you should be doing something else.
BecameGarbage
is what you would get when the job is finalized. It could be that the job was e.g. waiting for a message on a channel that became garbage or it could be that the job executed abort
or synchronized on never
. I believe trying to differentiate further from these would also be a misfeature.
The Reason
type would be nice - in particular whether the job just finished or whether it Raised an exception. I agree with you that it should not give access to the value returned though.
So the idea here is that "minions" are managed by the supervisor. They will be provided with a "sendWill" method that allows them to tell the supervisor what state they wish to be restarted with if they unexpectedly die, and should also provide a shutdown channel where the supervisor can tell them to shutdown.
Currently retry policies break down as either: don't restart, restart immediately or restart after a delay. I'll also be adding some more.
If you use paket to grab a copy of Hopac you should be able to run the entire supervisor in an interactive session to have a play.