Skip to content

Instantly share code, notes, and snippets.

@haf
Created September 7, 2016 13:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save haf/dc6b83aa153908efddc5f341b29fdbdc to your computer and use it in GitHub Desktop.
Save haf/dc6b83aa153908efddc5f341b29fdbdc to your computer and use it in GitHub Desktop.
Logary Eventing Spike
namespace Logary.Eventing
open System
open NodaTime
open Hopac
open Hopac.Infixes
open Aether
/// Stubs for the rest of Logary's API
[<AutoOpen>]
module Reffed =
let flip f a b = f b a
type Value = float
type Units = string
type PointName = string []
let pn (name : string) =
match name.Split([| '.' |], StringSplitOptions.RemoveEmptyEntries) with
| [||] -> invalidArg "name" "is empty"
| xs -> xs
type Message =
{ timestamp : int64
name : PointName
value : Value * Units }
static member derivedWithUnit name v u =
{ timestamp = DateTime.UtcNow.Ticks * 100L
name = name
value = v, u }
let value_ =
(fun x -> x.value),
(fun v x -> { x with value = v })
type Named =
abstract name : PointName
module internal Random =
let private buf = Array.zeroCreate sizeof<int64>
let private BitsPerLong = 63
let random = new Threading.ThreadLocal<_>(fun () -> Random())
let nextInt () =
random.Value.Next (Int32.MinValue, Int32.MaxValue)
/// get the next int within [0, max]
let nextIntMax max =
random.Value.Next max
let nextInt64 () =
random.Value.NextBytes buf
BitConverter.ToInt64 (buf, 0)
/// get the next int64 within [0, max]
let nextInt64Max (max : int64) =
let mutable bits = 0L
let mutable value = 0L
let mutable first = true
while first || bits - value + (max - 1L) < 0L do
bits <- nextInt64 () &&& (~~~(1L <<< BitsPerLong))
value <- bits % max
first <- false
value
type Duration with
member x.nanoSeconds =
x.Ticks * 100L
module Snapshot =
type T =
private { values : int64 [] }
let create unsorted =
Array.sortInPlace unsorted
{ values = unsorted }
let size s = s.values.Length
// TODO: when I need to read values multiple times:
// memoized snapshot to avoid recalculation of values after reading:
let quantile q s =
if q < 0. || q > 1. then
invalidArg "q" "quantile is not in [0., 1.]"
if size s = 0 then
0.
else
let pos = q * float (size s + 1)
match pos with
| _ when pos < 1. ->
float s.values.[0]
| _ when pos >= float (size s) ->
float s.values.[size s - 1]
| _ ->
let lower = s.values.[int pos - 1]
let upper = s.values.[int pos]
float lower + (pos - floor pos) * float (upper - lower)
let median = quantile 0.5
let percentile75th = quantile 0.75
let percentile95th = quantile 0.95
let percentile98th = quantile 0.98
let percentile99th = quantile 0.99
let percentile999th = quantile 0.999
let values s = s.values
let min s = if size s = 0 then 0L else Array.min s.values
let max s = if size s = 0 then 0L else Array.max s.values
let private meanAndSum s =
if size s = 0 then 0., 0. else
let mutable sum = 0.
for x in s.values do
sum <- sum + float x
let mean = float sum / float s.values.Length
mean, sum
let mean = fst << meanAndSum
let stdDev s =
let size = size s
if size = 0 then 0. else
let mean = mean s
let sum = s.values |> Array.map (fun d -> Math.Pow(float d - mean, 2.)) |> Array.sum
sqrt (sum / float (size - 1))
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Metric =
type T =
private { updateCh : Ch<Value * Units>
tickCh : Ch<unit>
outputs : Stream.Src<Message> }
let create (reduce : 'state -> Value * Units -> 'state)
initial
(handleTick : 'state -> 'state * Message list)
: Job<T> =
let self =
{ updateCh = Ch ()
tickCh = Ch ()
outputs = Stream.Src.create () }
let publish acc m =
acc >>= fun () ->
Stream.Src.value self.outputs m
let emptyResult =
Job.result ()
let server = Job.iterateServer (initial, []) <| fun (state : 'state, msgs) ->
Alt.choose [
self.tickCh ^=> fun () ->
let state', msgs' = handleTick state
// first publish the previous messages
msgs |> List.fold publish emptyResult
// now publish the new ones
>>= fun () -> msgs' |> List.fold publish emptyResult
>>- fun () -> state', []
self.updateCh ^-> (reduce state >> fun s -> s, msgs)
]
server >>-. self
let tick m =
Ch.give m.tickCh ()
let update (value, units) m =
m.updateCh *<- (value, units)
let pipe m other =
other
|> Stream.iterJob (flip update m)
|> Job.start
let values m =
Stream.Src.tap m.outputs
|> Stream.mapFun (Optic.get value_)
let messages m = Stream.Src.tap m.outputs
/// Mutable uniform distribution
module Uniform =
type T =
private { count : int64
values : int64 [] }
let create () =
{ count = 0L
values = Array.zeroCreate 1024 }
let snapshot r =
let size r = int (min r.count (int64 r.values.Length))
Snapshot.create (r.values |> Array.take (size r))
let update r value =
let count' = r.count + 1L
if count' <= int64 r.values.Length then
r.values.[int count' - 1] <- value
else
let rnd = Random.nextIntMax (min (int count') Int32.MaxValue)
if rnd < r.values.Length then
r.values.[rnd] <- value
{ r with count = r.count + 1L }
/// The an exponentially weighted moving average that gets ticked every
/// period (a period is a duration between events), but can get
/// updates at any point between the ticks.
module ExpWeightedMovAvg =
/// calculate the alpha coefficient from a number of minutes
///
/// - `duration` is how long is between each tick
/// - `mins` is the number of minutes the EWMA should be calculated over
let internal calculateAlpha (window : Duration) (tickPeriod : Duration)=
1. - exp (- (float tickPeriod.Ticks / float window.Ticks))
type T =
private { inited : bool
/// in samples per nanosecond
rate : float
uncounted : int64
/// Alpha is dependent on the duration between sampling events ("how long
/// time is it between the data points") so they are given as a pair.
alpha : float
/// interval in nanoseconds
interval : float }
/// Create a new EWMA state that you can do `update` and `tick` on.
///
/// - `window` – average over this many minutes
/// - `tickPeriod` – how often is the reservoir ticked
let create (window : Duration, tickPeriod : Duration) =
let alpha = calculateAlpha tickPeriod window
{ inited = false
rate = 0.
uncounted = 0L
alpha = alpha
interval = float tickPeriod.nanoSeconds }
let update state value =
{ state with uncounted = state.uncounted + value }
let tick state =
let count = float state.uncounted
let instantRate = count / state.interval
let calcRate currentRate alpha instantRate =
currentRate + alpha * (instantRate - currentRate)
if state.inited then
{ state with uncounted = 0L
rate = calcRate state.rate state.alpha instantRate }
else
{ state with uncounted = 0L
inited = true
rate = instantRate }
let rate (inUnit : Duration) state =
state.rate * float inUnit.nanoSeconds
module Metrics =
let counter derivedName units : (* tickPeriod *) Duration -> Job<Metric.T> =
fun (tickPeriod : Duration) ->
let name : PointName = pn derivedName
let reducer state = function
| f, _ when state = Double.MaxValue && f > 0. ->
state
| f, _ when state = Double.MinValue && f < 0. ->
state
| f, _ ->
state + f
let ticker acc = 0., [ Message.derivedWithUnit name acc units ]
Metric.create reducer 0. ticker
let uniform derivedName units (selectors : (Snapshot.T -> int64) list) =
fun (tickPeriod : Duration) ->
let name : PointName = pn derivedName
let reducer state (value, units) = Uniform.update state (int64 value)
let state = Uniform.create ()
let ticker state =
let snapshot = Uniform.snapshot state
state,
[ for select in selectors do
let value = select snapshot
yield Message.derivedWithUnit name (float value) units
]
Metric.create reducer state ticker
let ewma derivedName units window =
fun (tickPeriod : Duration) ->
let perSecond = Duration.FromSeconds 1L
let name : PointName = pn derivedName
let reducer state (value : Value, units : Units) =
ExpWeightedMovAvg.update state (int64 value)
let ticker state =
let value = state |> ExpWeightedMovAvg.rate perSecond
let msg = Message.derivedWithUnit name value units
state, [ msg ]
let state = ExpWeightedMovAvg.create (window, tickPeriod)
Metric.create reducer state ticker
type EngineConf =
{ tickInterval : Duration
timestamp : unit -> Instant }
module Engine =
type Running =
private {
shutdownCh : Ch<unit>
pauseCh : Ch<unit>
startCh : Ch<unit>
}
type Configuring =
private {
shutdownCh : Ch<unit>
registerCh : Ch<Duration -> Job<Metric.T>>
startCh : Ch<IVar<Running>>
}
/// Create a new CEP engine
let create (conf : EngineConf) : Job<Configuring> =
let shutdown = Ch ()
let register = Ch ()
let start = Ch ()
let pause = Ch ()
let tick = Ch ()
let events = Ch ()
let rec stopped (pending : Job<Metric.T> list) =
Alt.choose [
shutdown :> Alt<unit>
register ^=> fun metric ->
let mJ = metric conf.tickInterval
stopped (mJ :: pending)
start ^=> fun _ ->
let now = conf.timestamp ()
Job.conCollect pending >>= started now
]
and paused metrics =
start ^=> fun _ ->
let now = conf.timestamp ()
started now metrics
and started (startedAt : Instant) (metrics : ResizeArray<Metric.T>) =
Alt.choose [
shutdown :> Alt<unit>
pause ^=>. paused metrics
events ^=> fun _ ->
// TODO: handle injected Messages, reduce over all metrics
started startedAt metrics
tick ^=> fun _ ->
// TODO: tick all metrics
started startedAt metrics
]
stopped [] >>-.
{ shutdownCh = shutdown
registerCh = register
startCh = start }
let inject (m : Message) =
Job.result ()
let start (e : Configuring) : Alt<Running> =
e.startCh *<+=>- fun inst -> inst
let pause (e : Running) =
e.pauseCh *<- ()
let register metricFactory (e : Configuring) =
// TODO: consider how to match injected events with these metrics which
// can listen to events. PointName matching?
e.registerCh *<- metricFactory
let subscribe (pn : PointName) : Job<Stream<Message>> =
Stream.Cons.Nil
module Usage =
let bootstrap () =
let lps = Metrics.counter "Usage.Authentication.login" "login/s"
let lpsAvgs = Metrics.ewma "Usage.Authentication.login.ewma" "login/s" (Duration.FromMinutes 5L)
let dbDist = Metrics.uniform "Usage.DB.read" "s"
let conf =
{ tickInterval = Duration.FromMilliseconds 500L
timestamp = fun () -> SystemClock.Instance.Now }
job {
let! engine = Engine.create conf
do! Engine.register lps
return engine
}
let TODO = ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment