Skip to content

Instantly share code, notes, and snippets.

@mavnn
Last active September 1, 2016 13:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mavnn/57e39ce97c7de1aa3f24da77478412fc to your computer and use it in GitHub Desktop.
Save mavnn/57e39ce97c7de1aa3f24da77478412fc to your computer and use it in GitHub Desktop.
Hopac supervisor using proc
#!/usr/bin/env fsharpi --exec
#I "packages/Hopac/lib/net45"
#r "Hopac.Core.dll"
#r "Hopac.dll"
#r "Hopac.Platform.dll"
open System
open Hopac
open Hopac.Infixes
type Policy =
| Restart
| Terminate
| Delayed of TimeSpan
type MinionInfo =
{
name : string
policy : Policy
job : Ch<IVar<unit>> -> (obj -> Job<unit>) -> obj option -> Job<unit>
}
type MinionState =
{
info : MinionInfo
state : obj option
shutdown : Ch<IVar<unit>>
}
type JobId = private JobId of int
type private SupervisorState =
{
ident : int
minions : Map<JobId, MinionState>
processes : Map<JobId, Alt<JobId>>
delayed : Map<string, Alt<string * (SupervisorState -> Job<SupervisorState>)>>
}
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module private SupervisorState =
let removeMinion jobId state =
{ state with
processes = Map.remove jobId state.processes
minions = Map.remove jobId state.minions }
let addMinion jobId minionState (p : Proc) will state =
{ state with
ident = state.ident + 1
processes =
Map.add jobId (p ^-> (fun () -> jobId)) state.processes
minions =
Map.add jobId minionState state.minions }
let addDelayed name promise state =
{ state with
delayed = Map.add name promise state.delayed }
let removeDelayed name state =
{ state with
delayed = Map.remove name state.delayed }
let updateWill jobId will state =
{ state with
minions = Map.map (fun k v -> if k = jobId then { v with state = Some will } else v) state.minions }
let jobNames state =
let running =
state.minions
|> Map.toList
|> List.map (fun (_, { info = { name = n } }) -> n)
let delayed =
state.delayed
|> Map.toList
|> List.map fst
List.concat [running;delayed]
let jobState state name =
state.minions
|> Map.toSeq
|> Seq.tryFind (fun (jobId, minionState) -> minionState.info.name = name)
type Supervisor =
{
shutdown : Ch<IVar<unit>>
register : Ch<MinionInfo>
unregister : Ch<string>
}
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Supervisor =
let create logger =
let shutdownCh = Ch()
let registerCh = Ch()
let unregisterCh = Ch()
let lastWillCh = Ch()
let startMinion minionInfo will state =
logger <| sprintf "Starting minion %A" minionInfo.name
if SupervisorState.jobNames state |> List.contains minionInfo.name then
logger <| sprintf "New minion not started; %A already supervised" minionInfo.name
Job.result state
else
let jobId = JobId state.ident
let minionShutdown = Ch()
let minionState = { info = minionInfo; state = will; shutdown = minionShutdown }
Proc.start (minionInfo.job minionShutdown (fun o -> Ch.send lastWillCh (jobId, o)) will)
>>- fun p ->
logger <| sprintf "Minion %A (%A) started" minionInfo.name jobId
SupervisorState.addMinion jobId minionState p will state
let unregisterMinion name state =
logger <| sprintf "Unregistering %A started" name
match SupervisorState.jobState state name with
| Some (jobId, minionState) ->
logger <| sprintf "Shutting down %A (%A)" name jobId
minionState.shutdown *<-=>- id
>>-. SupervisorState.removeMinion jobId state
| None ->
logger <| sprintf "Received request to unregister unknown job %A" name
Job.result state
let handleTermination state jobId =
let minionState = Map.find jobId state.minions
match minionState.info.policy with
| Terminate ->
logger <| sprintf "%A (%A) terminated; removing from supervision" minionState.info.name jobId
SupervisorState.removeMinion jobId state
|> Job.result
| Restart ->
logger <| sprintf "%A (%A) terminated; restarting" minionState.info.name jobId
SupervisorState.removeMinion jobId state
|> startMinion minionState.info minionState.state
| Delayed delay ->
logger <| sprintf "%A (%A) terminated; restarting in %A" minionState.info.name jobId delay
let promise =
timeOut delay
>>-. (minionState.info.name, startMinion minionState.info minionState.state)
|> memo
state
|> SupervisorState.removeMinion jobId
|> SupervisorState.addDelayed minionState.info.name (Promise.read promise)
|> Job.result
let replaceLastWill state (jobId, will) =
logger <| sprintf "(%A) sent new will" jobId
SupervisorState.updateWill jobId will state
|> Job.result
let rec loop state =
// Sanity check; should fail hard if these fail...
// These should be removed after testing.
let processIds = state.processes |> Map.toList |> List.map fst
if processIds <> (state.minions |> Map.toList |> List.map fst) then
failwithf "Unmatched process and minion maps\nprocesses: %A\n minions: %A"
processIds
(state.minions |> Map.toList |> List.map fst)
if processIds <> List.distinct processIds then
failwithf "duplicate process ids?"
let minionNames = state.minions |> Map.toList |> List.map snd |> List.map (fun ms -> ms.info.name)
if minionNames <> List.distinct minionNames then
failwithf "duplicate minion names detected\n%A" minionNames
Alt.choose [
// shutdown
shutdownCh ^=>
fun ack ->
let shutdownMinion (minionState : MinionState) =
minionState.shutdown *<-=>- id
let shutdownAll =
Job.seqIgnore (state.minions |> Map.toSeq |> Seq.map (snd >> shutdownMinion))
|> Job.map (fun () -> logger "All minions shutdown")
|> memo
logger "Shutting down minions!"
Alt.choose [
shutdownAll |> Promise.read
timeOutMillis 1000 |> Alt.afterFun (fun () -> logger "Minion shutdown timed out without all minions shutting down cleanly")
] >>=. ack *<= ()
// anything else will create a new state and then recurse into the loop
Alt.choose [
// process delayed restarts
state.delayed
|> Map.toSeq
|> Seq.map snd
|> Alt.choose
|> Alt.afterJob
(fun (delayName, restart) ->
state
|> SupervisorState.removeDelayed delayName
|> restart)
// register new minion
registerCh ^=>
fun minionInfo ->
startMinion minionInfo None state
// unregister minion
unregisterCh ^=>
fun name -> unregisterMinion name state
// new last will
lastWillCh ^=> replaceLastWill state
// handle termination
state.processes
|> Map.toSeq
|> Seq.map snd
|> Alt.choose
|> Alt.afterJob (fun jid -> handleTermination state jid)
] |> Alt.afterJob loop
]
loop { ident = 0; minions = Map.empty; processes = Map.empty; delayed = Map.empty }
|> start
{
shutdown = shutdownCh
register = registerCh
unregister = unregisterCh
}
// interactive testing...
let testMinion name failIf shutdown (sendWill : obj -> Job<unit>) (lastWill : obj option) : Job<unit> =
let rec loop state =
Alt.choose [
shutdown ^=> fun ack -> ack *<= ()
timeOutMillis 500
|> Alt.afterFun (fun () -> if failIf state then failwith "boom" else printfn "teeeeeeeest minion! [%s - %d]" name state)
|> Alt.afterJob (fun () -> sendWill <| state + 1)
|> Alt.afterJob (fun () -> loop <| state + 1)
] :> Job<unit>
match lastWill with
| None -> loop 0
| Some will ->
match will with
| :? int -> loop (will :?> int)
| _ -> loop 0
let sup = Supervisor.create (printfn "%s")
let rand = System.Random()
let test1 = { name = "test1"; policy = Restart; job = testMinion "test1" (fun _ -> false) }
let test2 = { name = "delayedTest"; policy = Delayed <| TimeSpan.FromSeconds 1.; job = testMinion "delayedTest" (fun _ -> rand.Next(0, 2) = 1) }
sup.register *<+ test1 |> start
sup.register *<+ test2 |> start
sup.unregister *<- "test1" |> run
sup.unregister *<- "delayedTest" |> run
sup.shutdown *<-=>- id |> run
@mavnn
Copy link
Author

mavnn commented Sep 1, 2016

So the idea here is that "minions" are managed by the supervisor. They will be provided with a "sendWill" method that allows them to tell the supervisor what state they wish to be restarted with if they unexpectedly die, and should also provide a shutdown channel where the supervisor can tell them to shutdown.

Currently retry policies break down as either: don't restart, restart immediately or restart after a delay. I'll also be adding some more.

If you use paket to grab a copy of Hopac you should be able to run the entire supervisor in an interactive session to have a play.

@polytypic
Copy link

Didn't yet look at the code, but I'm thinking of adding details on the reason for termination into the Proc mechanism. Something like this:

type Reason =
  | Raised of exn
  | RanToCompletion
  | BecameGarbage

It would be possible to add the final value returned by the job (boxed as an obj or even have a type variable in Proc), but I believe that would be a misfeature. If you want a result from the job, you should be doing something else.

BecameGarbage is what you would get when the job is finalized. It could be that the job was e.g. waiting for a message on a channel that became garbage or it could be that the job executed abort or synchronized on never. I believe trying to differentiate further from these would also be a misfeature.

@mavnn
Copy link
Author

mavnn commented Sep 1, 2016

The Reason type would be nice - in particular whether the job just finished or whether it Raised an exception. I agree with you that it should not give access to the value returned though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment