-
-
Save haf/dc6b83aa153908efddc5f341b29fdbdc to your computer and use it in GitHub Desktop.
Logary Eventing Spike
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Logary.Eventing | |
open System | |
open NodaTime | |
open Hopac | |
open Hopac.Infixes | |
open Aether | |
/// Stubs for the rest of Logary's API | |
[<AutoOpen>] | |
module Reffed = | |
let flip f a b = f b a | |
type Value = float | |
type Units = string | |
type PointName = string [] | |
let pn (name : string) = | |
match name.Split([| '.' |], StringSplitOptions.RemoveEmptyEntries) with | |
| [||] -> invalidArg "name" "is empty" | |
| xs -> xs | |
type Message = | |
{ timestamp : int64 | |
name : PointName | |
value : Value * Units } | |
static member derivedWithUnit name v u = | |
{ timestamp = DateTime.UtcNow.Ticks * 100L | |
name = name | |
value = v, u } | |
let value_ = | |
(fun x -> x.value), | |
(fun v x -> { x with value = v }) | |
type Named = | |
abstract name : PointName | |
module internal Random = | |
let private buf = Array.zeroCreate sizeof<int64> | |
let private BitsPerLong = 63 | |
let random = new Threading.ThreadLocal<_>(fun () -> Random()) | |
let nextInt () = | |
random.Value.Next (Int32.MinValue, Int32.MaxValue) | |
/// get the next int within [0, max] | |
let nextIntMax max = | |
random.Value.Next max | |
let nextInt64 () = | |
random.Value.NextBytes buf | |
BitConverter.ToInt64 (buf, 0) | |
/// get the next int64 within [0, max] | |
let nextInt64Max (max : int64) = | |
let mutable bits = 0L | |
let mutable value = 0L | |
let mutable first = true | |
while first || bits - value + (max - 1L) < 0L do | |
bits <- nextInt64 () &&& (~~~(1L <<< BitsPerLong)) | |
value <- bits % max | |
first <- false | |
value | |
type Duration with | |
member x.nanoSeconds = | |
x.Ticks * 100L | |
module Snapshot = | |
type T = | |
private { values : int64 [] } | |
let create unsorted = | |
Array.sortInPlace unsorted | |
{ values = unsorted } | |
let size s = s.values.Length | |
// TODO: when I need to read values multiple times: | |
// memoized snapshot to avoid recalculation of values after reading: | |
let quantile q s = | |
if q < 0. || q > 1. then | |
invalidArg "q" "quantile is not in [0., 1.]" | |
if size s = 0 then | |
0. | |
else | |
let pos = q * float (size s + 1) | |
match pos with | |
| _ when pos < 1. -> | |
float s.values.[0] | |
| _ when pos >= float (size s) -> | |
float s.values.[size s - 1] | |
| _ -> | |
let lower = s.values.[int pos - 1] | |
let upper = s.values.[int pos] | |
float lower + (pos - floor pos) * float (upper - lower) | |
let median = quantile 0.5 | |
let percentile75th = quantile 0.75 | |
let percentile95th = quantile 0.95 | |
let percentile98th = quantile 0.98 | |
let percentile99th = quantile 0.99 | |
let percentile999th = quantile 0.999 | |
let values s = s.values | |
let min s = if size s = 0 then 0L else Array.min s.values | |
let max s = if size s = 0 then 0L else Array.max s.values | |
let private meanAndSum s = | |
if size s = 0 then 0., 0. else | |
let mutable sum = 0. | |
for x in s.values do | |
sum <- sum + float x | |
let mean = float sum / float s.values.Length | |
mean, sum | |
let mean = fst << meanAndSum | |
let stdDev s = | |
let size = size s | |
if size = 0 then 0. else | |
let mean = mean s | |
let sum = s.values |> Array.map (fun d -> Math.Pow(float d - mean, 2.)) |> Array.sum | |
sqrt (sum / float (size - 1)) | |
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] | |
module Metric = | |
type T = | |
private { updateCh : Ch<Value * Units> | |
tickCh : Ch<unit> | |
outputs : Stream.Src<Message> } | |
let create (reduce : 'state -> Value * Units -> 'state) | |
initial | |
(handleTick : 'state -> 'state * Message list) | |
: Job<T> = | |
let self = | |
{ updateCh = Ch () | |
tickCh = Ch () | |
outputs = Stream.Src.create () } | |
let publish acc m = | |
acc >>= fun () -> | |
Stream.Src.value self.outputs m | |
let emptyResult = | |
Job.result () | |
let server = Job.iterateServer (initial, []) <| fun (state : 'state, msgs) -> | |
Alt.choose [ | |
self.tickCh ^=> fun () -> | |
let state', msgs' = handleTick state | |
// first publish the previous messages | |
msgs |> List.fold publish emptyResult | |
// now publish the new ones | |
>>= fun () -> msgs' |> List.fold publish emptyResult | |
>>- fun () -> state', [] | |
self.updateCh ^-> (reduce state >> fun s -> s, msgs) | |
] | |
server >>-. self | |
let tick m = | |
Ch.give m.tickCh () | |
let update (value, units) m = | |
m.updateCh *<- (value, units) | |
let pipe m other = | |
other | |
|> Stream.iterJob (flip update m) | |
|> Job.start | |
let values m = | |
Stream.Src.tap m.outputs | |
|> Stream.mapFun (Optic.get value_) | |
let messages m = Stream.Src.tap m.outputs | |
/// Mutable uniform distribution | |
module Uniform = | |
type T = | |
private { count : int64 | |
values : int64 [] } | |
let create () = | |
{ count = 0L | |
values = Array.zeroCreate 1024 } | |
let snapshot r = | |
let size r = int (min r.count (int64 r.values.Length)) | |
Snapshot.create (r.values |> Array.take (size r)) | |
let update r value = | |
let count' = r.count + 1L | |
if count' <= int64 r.values.Length then | |
r.values.[int count' - 1] <- value | |
else | |
let rnd = Random.nextIntMax (min (int count') Int32.MaxValue) | |
if rnd < r.values.Length then | |
r.values.[rnd] <- value | |
{ r with count = r.count + 1L } | |
/// The an exponentially weighted moving average that gets ticked every | |
/// period (a period is a duration between events), but can get | |
/// updates at any point between the ticks. | |
module ExpWeightedMovAvg = | |
/// calculate the alpha coefficient from a number of minutes | |
/// | |
/// - `duration` is how long is between each tick | |
/// - `mins` is the number of minutes the EWMA should be calculated over | |
let internal calculateAlpha (window : Duration) (tickPeriod : Duration)= | |
1. - exp (- (float tickPeriod.Ticks / float window.Ticks)) | |
type T = | |
private { inited : bool | |
/// in samples per nanosecond | |
rate : float | |
uncounted : int64 | |
/// Alpha is dependent on the duration between sampling events ("how long | |
/// time is it between the data points") so they are given as a pair. | |
alpha : float | |
/// interval in nanoseconds | |
interval : float } | |
/// Create a new EWMA state that you can do `update` and `tick` on. | |
/// | |
/// - `window` – average over this many minutes | |
/// - `tickPeriod` – how often is the reservoir ticked | |
let create (window : Duration, tickPeriod : Duration) = | |
let alpha = calculateAlpha tickPeriod window | |
{ inited = false | |
rate = 0. | |
uncounted = 0L | |
alpha = alpha | |
interval = float tickPeriod.nanoSeconds } | |
let update state value = | |
{ state with uncounted = state.uncounted + value } | |
let tick state = | |
let count = float state.uncounted | |
let instantRate = count / state.interval | |
let calcRate currentRate alpha instantRate = | |
currentRate + alpha * (instantRate - currentRate) | |
if state.inited then | |
{ state with uncounted = 0L | |
rate = calcRate state.rate state.alpha instantRate } | |
else | |
{ state with uncounted = 0L | |
inited = true | |
rate = instantRate } | |
let rate (inUnit : Duration) state = | |
state.rate * float inUnit.nanoSeconds | |
module Metrics = | |
let counter derivedName units : (* tickPeriod *) Duration -> Job<Metric.T> = | |
fun (tickPeriod : Duration) -> | |
let name : PointName = pn derivedName | |
let reducer state = function | |
| f, _ when state = Double.MaxValue && f > 0. -> | |
state | |
| f, _ when state = Double.MinValue && f < 0. -> | |
state | |
| f, _ -> | |
state + f | |
let ticker acc = 0., [ Message.derivedWithUnit name acc units ] | |
Metric.create reducer 0. ticker | |
let uniform derivedName units (selectors : (Snapshot.T -> int64) list) = | |
fun (tickPeriod : Duration) -> | |
let name : PointName = pn derivedName | |
let reducer state (value, units) = Uniform.update state (int64 value) | |
let state = Uniform.create () | |
let ticker state = | |
let snapshot = Uniform.snapshot state | |
state, | |
[ for select in selectors do | |
let value = select snapshot | |
yield Message.derivedWithUnit name (float value) units | |
] | |
Metric.create reducer state ticker | |
let ewma derivedName units window = | |
fun (tickPeriod : Duration) -> | |
let perSecond = Duration.FromSeconds 1L | |
let name : PointName = pn derivedName | |
let reducer state (value : Value, units : Units) = | |
ExpWeightedMovAvg.update state (int64 value) | |
let ticker state = | |
let value = state |> ExpWeightedMovAvg.rate perSecond | |
let msg = Message.derivedWithUnit name value units | |
state, [ msg ] | |
let state = ExpWeightedMovAvg.create (window, tickPeriod) | |
Metric.create reducer state ticker | |
type EngineConf = | |
{ tickInterval : Duration | |
timestamp : unit -> Instant } | |
module Engine = | |
type Running = | |
private { | |
shutdownCh : Ch<unit> | |
pauseCh : Ch<unit> | |
startCh : Ch<unit> | |
} | |
type Configuring = | |
private { | |
shutdownCh : Ch<unit> | |
registerCh : Ch<Duration -> Job<Metric.T>> | |
startCh : Ch<IVar<Running>> | |
} | |
/// Create a new CEP engine | |
let create (conf : EngineConf) : Job<Configuring> = | |
let shutdown = Ch () | |
let register = Ch () | |
let start = Ch () | |
let pause = Ch () | |
let tick = Ch () | |
let events = Ch () | |
let rec stopped (pending : Job<Metric.T> list) = | |
Alt.choose [ | |
shutdown :> Alt<unit> | |
register ^=> fun metric -> | |
let mJ = metric conf.tickInterval | |
stopped (mJ :: pending) | |
start ^=> fun _ -> | |
let now = conf.timestamp () | |
Job.conCollect pending >>= started now | |
] | |
and paused metrics = | |
start ^=> fun _ -> | |
let now = conf.timestamp () | |
started now metrics | |
and started (startedAt : Instant) (metrics : ResizeArray<Metric.T>) = | |
Alt.choose [ | |
shutdown :> Alt<unit> | |
pause ^=>. paused metrics | |
events ^=> fun _ -> | |
// TODO: handle injected Messages, reduce over all metrics | |
started startedAt metrics | |
tick ^=> fun _ -> | |
// TODO: tick all metrics | |
started startedAt metrics | |
] | |
stopped [] >>-. | |
{ shutdownCh = shutdown | |
registerCh = register | |
startCh = start } | |
let inject (m : Message) = | |
Job.result () | |
let start (e : Configuring) : Alt<Running> = | |
e.startCh *<+=>- fun inst -> inst | |
let pause (e : Running) = | |
e.pauseCh *<- () | |
let register metricFactory (e : Configuring) = | |
// TODO: consider how to match injected events with these metrics which | |
// can listen to events. PointName matching? | |
e.registerCh *<- metricFactory | |
let subscribe (pn : PointName) : Job<Stream<Message>> = | |
Stream.Cons.Nil | |
module Usage = | |
let bootstrap () = | |
let lps = Metrics.counter "Usage.Authentication.login" "login/s" | |
let lpsAvgs = Metrics.ewma "Usage.Authentication.login.ewma" "login/s" (Duration.FromMinutes 5L) | |
let dbDist = Metrics.uniform "Usage.DB.read" "s" | |
let conf = | |
{ tickInterval = Duration.FromMilliseconds 500L | |
timestamp = fun () -> SystemClock.Instance.Now } | |
job { | |
let! engine = Engine.create conf | |
do! Engine.register lps | |
return engine | |
} | |
let TODO = () |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment