Skip to content

Instantly share code, notes, and snippets.

@haf
Last active August 29, 2016 07:18
Show Gist options
  • Save haf/b13d3b09953e252f6bea4fe40c03be0f to your computer and use it in GitHub Desktop.
Save haf/b13d3b09953e252f6bea4fe40c03be0f to your computer and use it in GitHub Desktop.
Supervision Hopac
module Logary.Supervisor
open System
open Hopac
open Hopac.Infixes
open NodaTime
type PolicyStep =
| Sleep of dur:Duration
| RestartOne
| RestartTree
type Policy =
| Restart
| AllowedFailureRate of rate:float
type PointName = PointName of string []
type private Logger = string -> unit
type private JobId = PointName
type LastWillCh = Ch<obj>
type private NamedJob = JobId * LastWillCh * (LastWillCh -> Job<unit>)
type Runnable =
LastWillCh -> Job<unit>
type Running =
private { shutdown : Ch<unit> }
type Initial =
private {
start : IVar<unit>
supervise : Ch<Policy * Runnable * IVar<JobId>>
running : Running }
let private name = PointName [| "Logary"; "Supervisor" |]
let create (logger : Logger) : Initial =
// HACK to inject logging for PoC:
let log msg next x = logger msg ; next x
let startCh, shutdownCh, superviseCh, erroredCh =
IVar (), Ch (), Ch (), Ch ()
let lastWill : Map<JobId, obj> ref = ref Map.empty
let rec ready (jobs : NamedJob list) =
Alt.choose [
startCh ^=> fun () ->
log "started" running jobs
superviseCh ^=> fun (policy, fxJ, idRepl) ->
let jobId = PointName [| jobs.Length.ToString() |]
let lastWillCh = Ch ()
idRepl *<= jobId >>=. ready ((jobId, lastWillCh, fxJ) :: jobs)
]
and running jobs =
for jobId, lastWillCh, fxJ in jobs do
queue (
Job.tryIn (fxJ lastWillCh)
Job.result
(fun ex -> erroredCh *<- (jobId, ex))
)
Alt.choose [
erroredCh ^=> fun (jobId, ex) ->
logger (sprintf "%O errored" jobId)
// TODO:
// match policy jobId ex with
// | ...
running jobs
upcast shutdownCh
]
start (ready [])
{ start = startCh
supervise = superviseCh
running = { shutdown = shutdownCh }
}
/// Start the non-running supervisor
let start (sup : Initial) =
sup.start *<= () >>-. sup.running
/// Register a named job and its policy
let register (p : Policy, fxJ) (sup : Initial) : Alt<JobId> =
sup.supervise *<-=>- fun ackCh -> p, fxJ, ackCh
/// Shutdown the running supervisor
let shutdown (sup : Running) =
sup.shutdown *<- ()
// usage:
let runit lastWillCh =
Job.iterateServer 1 (fun i ->
timeOutMillis 1000 >>=.
lastWillCh *<- box (i + 1)
>>-. i + 1
)
[<EntryPoint>]
let main argv =
let sup = create (printfn "%s")
let jobId = sup |> register (Restart, runit) |> run
printfn "Got job id: %A" jobId
let running = sup |> start |> run
printfn "Shutting down"
running |> shutdown |> run
0
@haf
Copy link
Author

haf commented Aug 28, 2016

The idea is that the lastWillCh lets Logary Targets send state that they get re-initialised with should they fail. E.g. the RabbitMQ target could send its Map<RequestId, Message> to avoid being befuddled if we get an IOException. We can also make the RingBuffer persist across invocations to the named job, to avoid dropping messages. Next iteration of the PoC I'll add in policies. I should probably look at the F# actor frameworks for inspiration of policies, too.

@polytypic
Copy link

BTW, the Proc mechanism in Hopac is partly for supervision. Currently it gives no information on the reason why a running job was terminated, but that could be changed (e.g. giving exn instance in case of job being terminated due to raising an exception). It uses finalizers, so it adds some overhead, but it can also account for jobs that are garbage collected (e.g. block forever).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment