Skip to content

Instantly share code, notes, and snippets.

@welch
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save welch/c4e83bf67d946d2bf216 to your computer and use it in GitHub Desktop.
Save welch/c4e83bf67d946d2bf216 to your computer and use it in GitHub Desktop.
juttle forecast modules: forecast

Forecast

One-step-ahead predictions for timeseries using exponential smoothing

Forecast threads a smooth curve through a noisy timeseries in a way that lets you visualize trends, cycles, and anomalies. It can be used as part of an automatic anomaly detection system for metric timeseries (that is, sequences of timestamped numerical values).

It accomplishes this using a variation of Holt-Winters forecasting -- more generally known as exponential smoothing. Forecast decomposes a noisy signal into level, trend, repetitive "seasonal" effects, and unexplained variation or noise. The result is a smoothed version of the signal which may be used to forecast future values or detect unexpected variation. This approach has been successfully used for network anomaly detection with other monitoring tools. Here we implement the method in pure Juttle, and our ability to combine historic and realtime data in a single query lets us initialize the forecaster from recent data at startup.

// Exponential smoother/forecaster with de-seasonalization, based on Holt-Winters.
// This models an unobserved level and trend component in a noisy signal, along with
// optional "seasonal" effects at the level of day-over-day variation.
// The result is a smoothed version of the signal which may be used to forecast
// future values or detect unexpected variation.
//
// There are two useful pieces here:
// 1. a reducer named forecast, the holt-winters forecaster, called like this:
// ... | put state = forecast(Y, slevel, strend, sdaily) | ...
//
// 2. a proc named forecastPI (which calls forecast for you and emits some useful
// points based on its result), called like this:
// ... | forecastPI(Y, z, alpha, slevel, strend, sdaily) | ...
//
// See comments above each for detail on parameters and output
//
import "interp.juttle" as interp;
import "sources.juttle" as sources;
// forecast: compute holt-winters forecast of a field as it varies over time.
//
// parameters:
//
// Y is the point field to smooth.
// slevel, strend, sdaily are smoothing factors, numbers ranging [0..1].
// They determine how quickly the smoother adjusts to new values as they arrive.
// Setting a factor to 0 freezes the feature at its initial estimate,
// while setting a factor to 1 causes that feature to depend solely
// on the most recent point. Setting strend or sdaily to null
// removes that feature from the model.
//
// returns:
//
// forecast returns the forecast of the current value (using state prior to the
// current value) and its internal state estimates, as an array:
// [value, level, trend, daily]
//
// forecast assumes arriving points are equally spaced in time.
// level is the estimate of the deseasonalized series value, trend the estimate of the
// deseasonalized series slope. daily is the estimated seasonal factor
// to be used at the next occurrence of the current position in the cycle.
//
// This implementation differs from additive Holt-Winters in its handling of
// seasonality. It independently fits a C1 piecewise cubic curve to
// day-over-day values, and maintains this via gradient descent on prediction error
// rather than as part of a recursive Winters update (the small changes to make this
// a Winters-style update are commented in the code below).
//
export reducer forecast(Y, slevel, strend, sdaily) {
var level, trend, day, daily, time, yhat;
var initial = true, n_daily=6;
function predict() {
return level + trend + day;
}
function update() {
var err, Wday, i_daily, W, du;
if (initial) {
yhat = *Y ;
if (sdaily == null) {
day = 0;
} else {
day = *Y * sdaily / (slevel + sdaily);
daily = [day,day,day,day,day,day];
}
level = *Y - day;
trend = 0;
time = *"time";
initial = false;
} else {
// error-correction form of Holt-Winters, eg, https://www.otexts.org/fpp
// du = 1 ; // Winters-style: don't scale by interval
du = (*"time" - time) / :hour:;
time = *"time";
if (sdaily != null) {
// get the previous daily factors for this time
Wday = interp.project_cubic(*"time", daily, n_daily, :day:);
day = Wday[0];
i_daily = Wday[1];
W = Wday[2];
}
yhat = predict() ; // 1-step forecast from previous state as reducer return value
err = *Y - yhat;
if (sdaily != null) {
// update day and project the change back onto the daily array
// var derr = err ; // Winters-style: recursive update sees level and trend error
var derr = (*Y - day);
var dday = sdaily * derr * du;
day = day + dday;
daily[i_daily ] = daily[i_daily] + dday * W[0];
daily[(i_daily+1) % n_daily] = daily[(i_daily+1) % n_daily] + dday * W[1];
daily[(i_daily+2) % n_daily] = daily[(i_daily+2) % n_daily] + dday * W[2];
daily[(i_daily+3) % n_daily] = daily[(i_daily+3) % n_daily] + dday * W[3];
}
if (strend != null) {
trend = trend + slevel * strend * err * du;
}
level = level + trend + slevel * err * du;
}
}
function result() {
return [yhat, level, trend, day ];
}
function reset() {
initial = true;
}
}
// derivative (d/dt) of a timeseries, approximated
// as a backwards difference of Y vs the time field.
reducer ddt(Y, units) {
var y, time, dy, dt;
var initial = true;
function update() {
if (initial) {
dy = 0;
y = *Y;
dt = :1s:;
time = *"time";
initial = false;
} else {
dy = *Y - y ;
y = *Y;
dt = *"time" - time;
time = *"time";
}
}
function result() {
return dy * (units / dt);
}
function reset() {
initial = true;
}
}
// Compute a forecast along with prediction intervals based
// on a smoothed stderr estimate.
//
// parameters:
//
// Y: field to forecast.
// z: number of stdandard deviations from mean to include in the prediction
// interval (z = 1.96 is the 97.5 percentile and thus a 95% confidence bound,
// if errors were idependent and normally distributed (often untrue)).
// alpha: stderr smoothing alpha (0..1). How quickly your error envelope recovers from an anomaly
// slevel, strend, sdaily: smoothing factors for Holt, passed to the smooth() reducer
//
// result:
//
// For a field name Y, this populates the current point with:
// Y_pred: predicted next y value
// Y_level: de-seasonalized prediction
// Y_trend: predicted next slope
// Y_day: predicted next daily seasonal factor
// Y_err: estimated stderr (RMS prediction error over time)
// Y_derr: time derivative of stderr
// Y_hi, Y_low: prediction interval bounds
//
// For simple anomaly detection you want to test if Y_low <= Y <= Y_hi and alert if not
// Or test Y_derr against a fixed threshold (derr is adaptive to current variation, so
// a fixed derr threshold is really an adaptive alert threshold). the alpha parameter
// adjusts the sensitivity of Y_derr.
//
export sub forecastPI(Y, z, alpha, slevel, strend, sdaily) {
put state = forecast(Y, slevel, strend, sdaily)
| put err2 = (*Y - state[0]) * (*Y - state[0])
| put stderr = Math.sqrt(forecast(err2, alpha, null, null)[0])
| put *(Y+"_pred") = state[0]
| put *(Y+"_level") = state[1]
| put *(Y+"_trend") = state[2]
| put *(Y+"_day") = state[3]
| put *(Y+"_err") = stderr
| put *(Y+"_derr") = ddt(stderr, :hour:)
| put *(Y+"_hi") = state[0] + z * stderr
| put *(Y+"_low") = state[0] - z * stderr
| remove state, err2, stderr
}
// Demo: forecasting for simulated CPU
//
// This demo performs a smoothing forecast including a continuous
// model of repeated day-to-day varition. It displays timeseries of
// the original input, the predicted/smoothed input, a 95% confidence
// interval around this prediction, and the stderr. It also displays
// the prediction components: level, trend and daily variation.
// The seasonal component needs to see a few days' data to settle in.
//
sources.bumpy_cpu -from :2014-01-01: -to :2014-01-08:
| forecastPI -Y "cpu" -z 1.96 -alpha .5 -slevel .1 -strend null -sdaily .2
|( keep cpu, cpu_pred, cpu_level, cpu_trend, cpu_day, cpu_err, cpu_derr, time
| split cpu, cpu_pred, cpu_level, cpu_trend, cpu_day, cpu_err, cpu_derr
| @timechart -display.duration :3d:
; keep cpu, cpu_pred, cpu_low, cpu_hi, time
| split cpu, cpu_pred, cpu_low, cpu_hi
| @timechart -display.duration :3d:
)
// periodic interpolation functions.
//
// Functions to generate continuous periodic values that interpolate specified
// discrete periodic values.
//
// Exported subs:
//
// Exported functions:
// project_time(time, n, period)
// pwl(time, A, len, period)
// cubic(time, A, len, period)
// project_cubic(time, A, len, period)
//
// Exported reducers:
//
//////////////////////////////////////////////////////////////////////////////////////
//
// project_time: project time onto a cycle of intervals of duration period/n.
// Return [idx, u] where 0 <= idx < n is the periodic interval this
// time falls into, and 0 <= u < 1 is the coordinate within the
// unit interval
//
export function project_time(time, n, period) {
var interval = period / n;
var t0 = Date.quantize(time, interval);
var u = (time - t0) / interval;
var idx = Math.floor((t0 - Date.new(0)) / interval) % n;
return [idx, u];
}
//
// pwl: piecewise-linear interpolation (periodic)
// interpolate values to get a continuous periodic function of time composed
// of lines connecting the values in A.
// A is an array of samples representing an overall cycle of duration period
// with samples uniformly spaced and s.t. A[0] should be the value at t==0.
//
export function pwl(time, A, len, period) {
var IDX = project_time(time, len, period);
var idx=IDX[0], u=IDX[1];
return (1 - u) * A[idx] + u * A[ (idx+1) % len ];
}
//
// catrom_weights: weighting for successive points for a given 0 <= u < 1
//
function catrom_weights(u) {
var u2 = u*u;
var u3 = u2 * u;
var W = [-u/2 + u2 - u3/2,
1 - 2.5 * u2 + 1.5 * u3,
u / 2 + 2 * u2 - 1.5 * u3,
-u2 / 2 + u3 / 2];
return W;
}
//
// project_cubic: cubic spline interpolation along with segment index and weights
// Return an array [val, idx, W], where
// val == W[0]*A[idx] + W[1]*A[idx+1] + W[2]*A[idx+2] + W[3]*A[idx+3], indexed modulo len
//
export function project_cubic(time, A, len, period) {
var IDX = project_time(time, len, period);
var idx=(IDX[0] + len - 1) % len, u=IDX[1];
var W = catrom_weights(u);
var val = W[0]*A[idx] + W[1]*A[(idx+1) % len] + W[2]*A[(idx+2) % len] + W[3]*A[(idx+3) % len];
return [val, idx, W];
}
//
// cubic: cubic spline interpolation (periodic)
// interpolate values to get a smooth periodic function of time.
// A is an array of samples representing an overall cycle of duration period
// with samples uniformly spaced and s.t. A[0] should be the value at t==0.
//
export function cubic(time, A, len, period) {
return project_cubic(time, A, len, period)[0];
}
//
// demo: draw two periods of a cycle, interpolating linearly and cubically
//
const values = [10, 5, 20, 15, 30];
const N = 5;
emitter -start 0 -limit 60
| put PWL = pwl(time, values, N, :30s:)
| put curve = cubic(time, values, N, :30s:)
| split
| @timechart
// Forecasting and anomaly detection example using the Twitter anomaly timeseries from
// https://blog.twitter.com/2015/introducing-practical-and-robust-anomaly-detection-in-a-time-series
// This is a different computational approach, applied to the test data supplied with their R package.
// First, let's import some functions and source the data we'll use.
import "seasonal.juttle" as seasonal;
import "forecast.juttle" as FC;
source "https://gist.githubusercontent.com/davidbcook/8dc540293820238a8f9b/raw/f061ade7be99410bd38d8a27fd1b629a5bcc0339/twitter-anomaly.json"
// forecastPI takes several arguments: Y - the field of interest,
// z - the number of standard deviations to include in the prediction interval,
// alpha, slevel, strend - several smoothing parameters.
// You can read more about the function and the arguments here: https://gist.github.com/davidbcook/8dc540293820238a8f9b
| seasonal.remove_daily -from Date.new(0) -in 'count' -out 'daily'
| FC.forecastPI -Y "daily" -z 2 -alpha .2 -slevel .8 -strend null -sdaily null
| split daily_level, daily_err, daily_derr, count
| (
reduce -every :30 minutes: value = max(value) by name;
reduce -every :30 minutes: -on :15 minutes: value = min(value) by name;
)
| @timechart -display.dataDensity 0
// math utilities
//
// Exported subs:
//
// Exported functions:
// inv2x2(a,b,c,d): invert 2x2 matrix
// isFinite(x): is x a good number?
//
// Exported reducers:
//
/////////////////////////////////////////////////////////////////////////
//
// inv2x2:
// invert a 2x2 matrix | a, b ; c, d | (row major order)
//
export function inv2x2(a, b, c, d) {
var det = (a * d - b * c) ;
return [d / det, -b / det, -c / det, a / det] ;
}
//
// isFinite:
// verify x is a good number.
//
export function isFinite(x) {
return (x == x && x != null && x != x + 1);
}
// model repeating features ("seasons") in a series, at varying intervals and resolutions
//
// Exported subs:
// fit: generate points representing a seasonal fit to a series of points
// ... | fit -in 'cpu' -period :d: -N 24 -out 'daily';
// hourly: fit a smoothly repeating hourly pattern, at 2-minute increments.
// ... | hourly -in in -out 'hourly' -from from
// daily: fit a smoothly repeating daily pattern, at hourly increments.
// ... | daily -in in -out 'daily' -from from
// weekly: fit a smoothly repeating weekly pattern, at 6-hour increments.
// ... | weekly -in in -out 'weekly' -from from
//
// Exported reducer:
// fit_coeffs: compute control points for best-fit periodic piecewise cubic spline
// return control point array that best fits the input series.
// ... | reduce daily = fit(in, :d:, 24)
// | ...
//
import "math.juttle" as math;
import "sources.juttle" as sources;
import "interp.juttle" as interp;
//
// fit_coeffs: Fit a periodic series of control points that define a best-fit piecewise cubic curve
// for a batch or window of points. This must be called with -acc true so that model state
// persists across epochs.
//
// parameters:
// in -- field to fit
// in0 -- estimated mean value of input field.
// period -- duration of one cycle. The driving reducer -every/-over must equal this.
// N -- number of intervals to subdivide period for piecewise fit.
// returns vector [A, len, period]
//
export reducer fit_coeffs(in, period, N) {
var t0, n_pts, new_epoch;
var pts, dpts;
var interval = period / N;
var gain = 1.0, min_gain = .1, gain_decay = 0.8;
var initialized = false;
function initialize() {
// initial gain of 1.0 means we'll install a best-fit to the first period,
// so our initial values here aren't important. subsequent lower gains
// will allow drift over time.
pts = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
gain = gain / gain_decay ; // undo new_epoch's change
new_epoch = true;
initialized = true;
}
function update() {
var err, Wy, i_y, W;
if (!initialized) {
var syntaxerror = initialize();
}
if (new_epoch) {
// error gradients will sum over the period into dpts,
// then result() will take the step
dpts = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
n_pts = 0;
gain = Math.max(gain * gain_decay, min_gain);
new_epoch = false;
}
n_pts = n_pts + 1;
if (n_pts > 1) {
// evaluate the seasonal curve at this time
// XXX avoid the first point of the epoch until we've sorted out epsilon
Wy = interp.project_cubic(#time, pts, N, period);
err = *in - Wy[0]; // actual - expected seasonal value at this time
i_y = Wy[1]; // index into control point array
W = Wy[2]; // weights of control points at this time
// distribute the error over the control point subset for this time
dpts[ i_y ] = dpts[ i_y ] + err * W[0];
dpts[(i_y+1) % N] = dpts[(i_y+1) % N] + err * W[1];
dpts[(i_y+2) % N] = dpts[(i_y+2) % N] + err * W[2];
dpts[(i_y+3) % N] = dpts[(i_y+3) % N] + err * W[3];
}
}
function result() {
// gradient descent on the accumulated error. Oh for a loop.
var i=0, step = gain * N / n_pts ;
if (!new_epoch) {
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
new_epoch = true;
}
return [pts, N, period, gain];
}
function reset() {
new_epoch = true;
}
}
//
// evaluate the piecewise cubic curve represented by coeffs at the given time.
// coeffs are as returned by fit_coeffs.
//
export function value(t, coeffs, default) {
if (math.isFinite(coeffs[0][0])) {
return interp.cubic(t, coeffs[0], coeffs[1], coeffs[2]);
} else {
return default;
}
}
export sub fit(in, period, N, from, out) {
(
reduce -acc true -every period -on from __fit_tmp=fit_coeffs(in, period, N)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
export sub hourly(in, from, out) {
fit -in in -out out -period :h: -N 30 -from from
}
export sub daily(in, from, out) {
fit -in in -out out -period :d: -N 24 -from from
}
export sub weekly(in, from, out) {
fit -in in -out out -period :w: -N 28 -from from
}
export sub remove_hourly(in, from, out) {
hourly -in in -out '__hourly_tmp' -from from
| put *out = *in - __hourly_tmp
| remove __hourly_tmp
}
export sub remove_daily(in, from, out) {
daily -in in -out '__daily_tmp' -from from
| put *out = *in - __daily_tmp
| remove __daily_tmp
}
export sub remove_weekly(in, from, out) {
weekly -in in -out '__weekly_tmp' -from from
| put *out = *in - __weekly_tmp
| remove __weekly_tmp
}
//
// demo: daily periodics
//
const from = :2014-01-01: ;
const dur = :7w: ;
sources.bumpy_cpu -from from -to from + dur
| weekly -in 'cpu' -out 'weekly' -from from
| put deweek = cpu - weekly
| daily -in 'deweek' -out 'daily' -from from
| put deseason = deweek - daily
| split
| @timechart -title "de-seasonalization" -display.dataDensity 0
// simulated sources for demos and tests
//
// Exported subs:
// bumpy_cpu: a 10-minute cpu metric with daily variation
// ripple_cpu: a 10-second cpu metric with minute-variation
//
// Exported functions:
//
// Exported reducers:
//
////////////////////////////////////////////////////////////////////////
//
// bumpy_cpu:
// a 10-minute cpu metric with noise and daily periodicity but no trend
//
function weekend_factor(time) {
if ((Date.get(time,'h') > 22 && Date.get(time, 'e') == 5)
|| Date.get(time, 'e') == 6
|| (Date.get(time,'h') < 6 && Date.get(time, 'e') == 1)) {
return 0.6 ;
} else if (Date.get(time, 'e') == 0) {
return 0.3 ;
} else {
return 1.0;
}
}
export sub bumpy_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -dos .3 -dos_dur :15s: -daily 3
-from from -to to -every :10m:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value" * weekend_factor(time)
| keep cpu, time
}
//
// ripple_cpu:
// a 10-second cpu metric with noise and daily periodicity but no trend
//
export sub ripple_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -daily 0
-ripple .5 -ripple_dur 60 -ripple_levels [-0.25, -0.25, 0, 0, 0, 0, 0.5]
-from from -to to -every :10s:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value"
| keep cpu, time
}
// example:
const start = :2014-01-01: ;
const dur = :1h: ;
bumpy_cpu -from :-2d: -to :now: | @timechart -title "2-day cpu variation"
;
ripple_cpu -from start -to start+dur
| @timechart -title "hourly cpu variation" -display.dataDensity 0
// model trends in a series via linear regression.
//
// Exported subs:
// fit: do a least-squares fit of a line to successive batches of points,
// and calculate the trend's value at each point. A fit is computed for each batch.
// ... | fit -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'trend';
// fit_initial: do a least-squares fit of a line to an initial window of points,
// and calculate this trend's value at subsequent points. A fit is computed once.
// ... | fit_initial -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend';
// fit_trailing: do a least-squares fit of a line to a moving window of points
// and calculate the trend value at each point. A fit is computed from each point.
// ... | fit_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend';
// rate: robust rate of change estimation by fitting a trend to trailing data.
// ... | rate -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'rate' ;
// change: trend-based change over an interval. This is rate * dt and is in the same units as the input.
// ... | change -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'change' ;
// subtract: de-trend a series of points or successive batches of points.
// ... | subtract -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'detrend_2h' ;
// subtract_trailing: de-trend a series of points by re-fitting at each point.
// ... | subtract_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'detrend_2h' ;
//
// Exported reducer:
// fit_coeffs: linear regression on a timeseries
// return coefficients of least-squares fit of a line to a specified field on a batch of points.
// ... | reduce line = coeffs(y, t0)
// | put y0 = value(t0, coeffs), y1 = value(t1, coeffs)
// | ...
//////////////////////////////////////////////////////////////////////////////////////
import "sources.juttle" as sources;
import "math.juttle" as math;
//
// fit_coeffs: Linear regression vs time, for a batch or window of points.
// parameters:
// y -- field to fit
// t0 -- time origin for first fitted line
// returns vector [b, m, t0, n]
// b -- yintercept, m: slope (per second), n: number of points fit.
//
export reducer fit_coeffs(y, t0) {
var initial = true ;
var T, Y, T2, TY; // accumulate t, y, ... etc
var n;
function update() {
if (initial) {
T = Y = T2 = TY = n = 0;
initial = false;
}
var t = Duration.seconds(*"time" - t0) ;
T = T + t;
Y = Y + *y;
T2 = T2 + t * t;
TY = TY + t * *y;
n = n + 1;
}
function expire() {
var t = Duration.seconds(*"time" - t0) ;
T = T - t;
Y = Y - *y;
T2 = T2 - t * t;
TY = TY - t * *y;
n = n - 1;
}
function result() {
var inv = math.inv2x2(T2, T, T, n) ;
var m = inv[0] * TY + inv[1] * Y;
var b = inv[2] * TY + inv[3] * Y;
return [b, m, t0, n];
}
function reset() {
initial = true;
}
}
//
// value:
// evaluate the trendline represented by coeffs at the given time
//
export function value(t, coeffs, default) {
if (math.isFinite(coeffs[0]) && math.isFinite(coeffs[1])) {
return coeffs[0] + Duration.seconds(t - coeffs[2]) * coeffs[1];
} else {
return default;
}
}
//
// fit:
// construct a sequence of linear fits over epochs specified by -every@from.
// output each point's projection onto the fitted line covering its epoch.
// parameters:
// in -- input field
// every -- data window for fitting
// from -- time origin for first fitted line
// out -- output field for projected line's value on each point
//
export sub fit(in, every, from, out) {
(
reduce -every every -over every -on from __fit_tmp=fit_coeffs(in, from)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// fit_initial:
// fit coefficients only once against the initial window,
// thereafter projecting against this same trend as time advances.
// For periodic data, this is a little dangerous if not run against an even
// multiple of the periodicity, because the extra partial-period data
// gives rise to a nonzero trend (phase-shift). But really, if you know you
// have periodic data, you shouldn't be fitting a trend!
//
// parameters:
// in -- input field
// initial -- duration of initial batch to fit
// from -- time origin for initial batch
// out -- output field for projected line's value on each point
//
export sub fit_initial(in, initial, from, out) {
(
reduce -every initial -over initial -on from -from from -to from+initial __fit_tmp=fit_coeffs(in, from)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// fit_trailing:
// construct a linear fit over a trailing window of points
// and put its value on the incoming point. A new fit is computed at each point.
// parameters:
// in -- input field
// over -- trailing window to fit trend over
// from -- time origin for first fitted line
// out -- output field for projected line's value
//
export sub fit_trailing(in, over, from, out) {
put -over over __fit_tmp = fit_coeffs(in, from)
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// subtract:
// de-trend intervals of points by subtracting their MSE linear fit.
// There will be a trend discontinuity at each refitting interval, so if
// this is a concern you should use subtract_trailing
// parameters:
// in -- field to de-trend
// every -- re-fitting interval
// over -- data window for fitting (>= every)
// from -- earliest time
// out -- output field for detrended value on each point
//
export sub subtract(in, every, over, from, out) {
fit -in in -every every -over over -from from -out '__subtract_tmp'
| put *out = *in - __subtract_tmp
| remove __subtract_tmp
}
//
// subtract_trailing:
// de-trend intervals of points by subtracting the MSE linear fit
// as each point arrives.
// parameters:
// in -- field to de-trend
// over -- data window for fitting
// from -- earliest time
// out -- output field for detrended value on each point
//
export sub subtract_trailing(in, over, from, out) {
fit_trailing -in in -over over -from from -out '__subtract_tmp'
| put *out = *in - __subtract_tmp
| remove __subtract_tmp
}
//
// rate:
// robustly estimate rate of change by fitting trendlines to a short moving
// window of points anchored at each successive point, and returning their slopes.
// parameters:
// in -- field to rate-estimate
// every -- time window for rate estimator (longer values give smoother derivatives)
// from -- earliest time
// out -- output field for rate (in/second) on each point
//
export sub rate(in, every, from, out) {
(
reduce -every every -over every -on from __rate_tmp=fit_coeffs(in, from)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = math.isFinite(__rate_tmp[1]) ? __rate_tmp[1] : null
| remove __rate_tmp
}
//
// change:
// estimate the amount of change over an interval due to a linear trend over the interval.
// parameters:
// in -- field to rate-estimate
// every -- time window for rate estimator (longer values give smoother derivatives)
// from -- earliest time
// out -- output field for amount change in this interval due to linear trend.
//
export sub change (in, every, from, out) {
rate -in in -every every -from from -out out
| put *out = *out * Duration.seconds(every)
}
//
// rmse:
// root mean square error of a line fitted to an interval. This is an estimate of how nonlinear the
// input was over an interval.
// parameters:
// in -- field to estimate
// every -- time window for rmse estimator
// from -- earliest time
// out -- output field
//
export sub rmse(in, every, from, out) {
fit -in in -every every -from from -out '__rmse_tmp'
| put __rmse_se = (*in - __rmse_tmp) * (*in - __rmse_tmp)
| reduce -every every __rmse_sse = sum(__rmse_se)
| put *out = Math.sqrt(__rmse_sse)
| remove __rmse_tmp, __rmse_se, __rmse_sse
}
//
// demo: long trends
// fit trendlines to successive batches of points in various ways,
// over long intervals, for de-trending prior to de-seasonalizing and
// anomaly detection. superimpose them on the original data.
//
const start = :2014-01-01: ;
const dur = :1w: ;
const every = :12h: ;
const over = :3d: ;
sources.bumpy_cpu -from start -to start + dur
| (
// fit a trend to an initial window of data, and apply it to all data going forward.
fit_initial -in 'cpu' -initial over -from start -out 'initial_trend'
| put detrend_initial = cpu - initial_trend ;
// fit a trend to a moving window of data anchored at the
// current point, updating it for each new point.
fit_trailing -in 'cpu' -over over -from start -out 'trend_trailing'
| put detrend_trailing = cpu - trend_trailing ;
)
| split
| @timechart -title "weekly trend" -display.dataDensity 0
;
//
// demo: use change to highlight rate-of-change anomalies
//
sources.ripple_cpu -from start -to start + :1h:
| change -in 'cpu' -every :60s: -from start -out 'rate'
| split
| @timechart -title "60s rate of change" -display.dataDensity 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment