Skip to content

Instantly share code, notes, and snippets.

@haf
Last active August 29, 2016 07:18
Show Gist options
  • Save haf/b13d3b09953e252f6bea4fe40c03be0f to your computer and use it in GitHub Desktop.
Save haf/b13d3b09953e252f6bea4fe40c03be0f to your computer and use it in GitHub Desktop.
Supervision Hopac
module Logary.Supervisor
open System
open Hopac
open Hopac.Infixes
open NodaTime
type PolicyStep =
| Sleep of dur:Duration
| RestartOne
| RestartTree
type Policy =
| Restart
| AllowedFailureRate of rate:float
type PointName = PointName of string []
type private Logger = string -> unit
type private JobId = PointName
type LastWillCh = Ch<obj>
type private NamedJob = JobId * LastWillCh * (LastWillCh -> Job<unit>)
type Runnable =
LastWillCh -> Job<unit>
type Running =
private { shutdown : Ch<unit> }
type Initial =
private {
start : IVar<unit>
supervise : Ch<Policy * Runnable * IVar<JobId>>
running : Running }
let private name = PointName [| "Logary"; "Supervisor" |]
let create (logger : Logger) : Initial =
// HACK to inject logging for PoC:
let log msg next x = logger msg ; next x
let startCh, shutdownCh, superviseCh, erroredCh =
IVar (), Ch (), Ch (), Ch ()
let lastWill : Map<JobId, obj> ref = ref Map.empty
let rec ready (jobs : NamedJob list) =
Alt.choose [
startCh ^=> fun () ->
log "started" running jobs
superviseCh ^=> fun (policy, fxJ, idRepl) ->
let jobId = PointName [| jobs.Length.ToString() |]
let lastWillCh = Ch ()
idRepl *<= jobId >>=. ready ((jobId, lastWillCh, fxJ) :: jobs)
]
and running jobs =
for jobId, lastWillCh, fxJ in jobs do
queue (
Job.tryIn (fxJ lastWillCh)
Job.result
(fun ex -> erroredCh *<- (jobId, ex))
)
Alt.choose [
erroredCh ^=> fun (jobId, ex) ->
logger (sprintf "%O errored" jobId)
// TODO:
// match policy jobId ex with
// | ...
running jobs
upcast shutdownCh
]
start (ready [])
{ start = startCh
supervise = superviseCh
running = { shutdown = shutdownCh }
}
/// Start the non-running supervisor
let start (sup : Initial) =
sup.start *<= () >>-. sup.running
/// Register a named job and its policy
let register (p : Policy, fxJ) (sup : Initial) : Alt<JobId> =
sup.supervise *<-=>- fun ackCh -> p, fxJ, ackCh
/// Shutdown the running supervisor
let shutdown (sup : Running) =
sup.shutdown *<- ()
// usage:
let runit lastWillCh =
Job.iterateServer 1 (fun i ->
timeOutMillis 1000 >>=.
lastWillCh *<- box (i + 1)
>>-. i + 1
)
[<EntryPoint>]
let main argv =
let sup = create (printfn "%s")
let jobId = sup |> register (Restart, runit) |> run
printfn "Got job id: %A" jobId
let running = sup |> start |> run
printfn "Shutting down"
running |> shutdown |> run
0
@polytypic
Copy link

BTW, the Proc mechanism in Hopac is partly for supervision. Currently it gives no information on the reason why a running job was terminated, but that could be changed (e.g. giving exn instance in case of job being terminated due to raising an exception). It uses finalizers, so it adds some overhead, but it can also account for jobs that are garbage collected (e.g. block forever).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment