Skip to content

Instantly share code, notes, and snippets.

@welch
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save welch/23c570fc361829b36a77 to your computer and use it in GitHub Desktop.
Save welch/23c570fc361829b36a77 to your computer and use it in GitHub Desktop.
juttle forecast modules: seasonality

Seasonality

In this example we fit seasonal curves to repeating daily and weekly patterns, and then subtract these patterns from the data. The remaining signal can be used as an input to a simple anomaly detector, without giving false-alarms for expected changes (like decreased levels over the weekend, or increases during daylight hours).

// periodic interpolation functions.
//
// Functions to generate continuous periodic values that interpolate specified
// discrete periodic values.
//
// Exported subs:
//
// Exported functions:
// project_time(time, n, period)
// pwl(time, A, len, period)
// cubic(time, A, len, period)
// project_cubic(time, A, len, period)
//
// Exported reducers:
//
//////////////////////////////////////////////////////////////////////////////////////
//
// project_time: project time onto a cycle of intervals of duration period/n.
// Return [idx, u] where 0 <= idx < n is the periodic interval this
// time falls into, and 0 <= u < 1 is the coordinate within the
// unit interval
//
export function project_time(time, n, period) {
var interval = period / n;
var t0 = Date.quantize(time, interval);
var u = (time - t0) / interval;
var idx = Math.floor((t0 - Date.new(0)) / interval) % n;
return [idx, u];
}
//
// pwl: piecewise-linear interpolation (periodic)
// interpolate values to get a continuous periodic function of time composed
// of lines connecting the values in A.
// A is an array of samples representing an overall cycle of duration period
// with samples uniformly spaced and s.t. A[0] should be the value at t==0.
//
export function pwl(time, A, len, period) {
var IDX = project_time(time, len, period);
var idx=IDX[0], u=IDX[1];
return (1 - u) * A[idx] + u * A[ (idx+1) % len ];
}
//
// catrom_weights: weighting for successive points for a given 0 <= u < 1
//
function catrom_weights(u) {
var u2 = u*u;
var u3 = u2 * u;
var W = [-u/2 + u2 - u3/2,
1 - 2.5 * u2 + 1.5 * u3,
u / 2 + 2 * u2 - 1.5 * u3,
-u2 / 2 + u3 / 2];
return W;
}
//
// project_cubic: cubic spline interpolation along with segment index and weights
// Return an array [val, idx, W], where
// val == W[0]*A[idx] + W[1]*A[idx+1] + W[2]*A[idx+2] + W[3]*A[idx+3], indexed modulo len
//
export function project_cubic(time, A, len, period) {
var IDX = project_time(time, len, period);
var idx=(IDX[0] + len - 1) % len, u=IDX[1];
var W = catrom_weights(u);
var val = W[0]*A[idx] + W[1]*A[(idx+1) % len] + W[2]*A[(idx+2) % len] + W[3]*A[(idx+3) % len];
return [val, idx, W];
}
//
// cubic: cubic spline interpolation (periodic)
// interpolate values to get a smooth periodic function of time.
// A is an array of samples representing an overall cycle of duration period
// with samples uniformly spaced and s.t. A[0] should be the value at t==0.
//
export function cubic(time, A, len, period) {
return project_cubic(time, A, len, period)[0];
}
//
// demo: draw two periods of a cycle, interpolating linearly and cubically
//
const values = [10, 5, 20, 15, 30];
const N = 5;
emitter -start 0 -limit 60
| put PWL = pwl(time, values, N, :30s:)
| put curve = cubic(time, values, N, :30s:)
| split
| @timechart
//
// demo: daily periodics
//
import "sources.juttle" as sources;
import "seasonal.juttle" as seasonal;
const from = :2014-01-01: ;
const dur = :5w: ;
sources.bumpy_cpu -from from -to from + dur
| seasonal.weekly -in 'cpu' -out 'weekly' -from from
| put deweek = cpu - weekly
| seasonal.remove_daily -in 'deweek' -out 'dedaily' -from from
| split cpu, weekly, dedaily
| @timechart -title "deseasonalization" -display.dataDensity 0
// math utilities
//
// Exported subs:
//
// Exported functions:
// inv2x2(a,b,c,d): invert 2x2 matrix
// isFinite(x): is x a good number?
//
// Exported reducers:
//
/////////////////////////////////////////////////////////////////////////
//
// inv2x2:
// invert a 2x2 matrix | a, b ; c, d | (row major order)
//
export function inv2x2(a, b, c, d) {
var det = (a * d - b * c) ;
return [d / det, -b / det, -c / det, a / det] ;
}
//
// isFinite:
// verify x is a good number.
//
export function isFinite(x) {
return (x == x && x != null && x != x + 1);
}
// model repeating features ("seasons") in a series, at varying intervals and resolutions
//
// Exported subs:
// fit: generate points representing a seasonal fit to a series of points
// ... | fit -in 'cpu' -period :d: -N 24 -out 'daily';
// hourly: fit a smoothly repeating hourly pattern, at 2-minute increments.
// ... | hourly -in in -out 'hourly' -from from
// daily: fit a smoothly repeating daily pattern, at hourly increments.
// ... | daily -in in -out 'daily' -from from
// weekly: fit a smoothly repeating weekly pattern, at 6-hour increments.
// ... | weekly -in in -out 'weekly' -from from
//
// Exported reducer:
// fit_coeffs: compute control points for best-fit periodic piecewise cubic spline
// return control point array that best fits the input series.
// ... | reduce daily = fit(in, :d:, 24)
// | ...
//
import "math.juttle" as math;
import "sources.juttle" as sources;
import "interp.juttle" as interp;
//
// fit_coeffs: Fit a periodic series of control points that define a best-fit piecewise cubic curve
// for a batch or window of points. This must be called with -acc true so that model state
// persists across epochs.
//
// parameters:
// in -- field to fit
// in0 -- estimated mean value of input field.
// period -- duration of one cycle. The driving reducer -every/-over must equal this.
// N -- number of intervals to subdivide period for piecewise fit.
// returns vector [A, len, period]
//
export reducer fit_coeffs(in, period, N) {
var t0, n_pts, new_epoch;
var pts, dpts;
var interval = period / N;
var gain = 1.0, min_gain = .1, gain_decay = 0.8;
var initialized = false;
function initialize() {
// initial gain of 1.0 means we'll install a best-fit to the first period,
// so our initial values here aren't important. subsequent lower gains
// will allow drift over time.
pts = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
gain = gain / gain_decay ; // undo new_epoch's change
new_epoch = true;
initialized = true;
}
function update() {
var err, Wy, i_y, W;
if (!initialized) {
var syntaxerror = initialize();
}
if (new_epoch) {
// error gradients will sum over the period into dpts,
// then result() will take the step
dpts = [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
];
n_pts = 0;
gain = Math.max(gain * gain_decay, min_gain);
new_epoch = false;
}
n_pts = n_pts + 1;
if (n_pts > 1) {
// evaluate the seasonal curve at this time
// XXX avoid the first point of the epoch until we've sorted out epsilon
Wy = interp.project_cubic(#time, pts, N, period);
err = *in - Wy[0]; // actual - expected seasonal value at this time
i_y = Wy[1]; // index into control point array
W = Wy[2]; // weights of control points at this time
// distribute the error over the control point subset for this time
dpts[ i_y ] = dpts[ i_y ] + err * W[0];
dpts[(i_y+1) % N] = dpts[(i_y+1) % N] + err * W[1];
dpts[(i_y+2) % N] = dpts[(i_y+2) % N] + err * W[2];
dpts[(i_y+3) % N] = dpts[(i_y+3) % N] + err * W[3];
}
}
function result() {
// gradient descent on the accumulated error. Oh for a loop.
var i=0, step = gain * N / n_pts ;
if (!new_epoch) {
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
pts[i] = pts[i] + dpts[i] * step; i = i + 1;
new_epoch = true;
}
return [pts, N, period, gain];
}
function reset() {
new_epoch = true;
}
}
//
// evaluate the piecewise cubic curve represented by coeffs at the given time.
// coeffs are as returned by fit_coeffs.
//
export function value(t, coeffs, default) {
if (math.isFinite(coeffs[0][0])) {
return interp.cubic(t, coeffs[0], coeffs[1], coeffs[2]);
} else {
return default;
}
}
export sub fit(in, period, N, from, out) {
(
reduce -acc true -every period -on from __fit_tmp=fit_coeffs(in, period, N)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from from -every :0s: ;
merge ;
)| join -onchange 2
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
export sub hourly(in, from, out) {
fit -in in -out out -period :h: -N 30 -from from
}
export sub daily(in, from, out) {
fit -in in -out out -period :d: -N 24 -from from
}
export sub weekly(in, from, out) {
fit -in in -out out -period :w: -N 28 -from from
}
export sub remove_hourly(in, from, out) {
hourly -in in -out '__hourly_tmp' -from from
| put *out = *in - __hourly_tmp
| remove __hourly_tmp
}
export sub remove_daily(in, from, out) {
daily -in in -out '__daily_tmp' -from from
| put *out = *in - __daily_tmp
| remove __daily_tmp
}
export sub remove_weekly(in, from, out) {
weekly -in in -out '__weekly_tmp' -from from
| put *out = *in - __weekly_tmp
| remove __weekly_tmp
}
//
// demo: daily periodics
//
const from = :2014-01-01: ;
const dur = :7w: ;
sources.bumpy_cpu -from from -to from + dur
| weekly -in 'cpu' -out 'weekly' -from from
| put deweek = cpu - weekly
| daily -in 'deweek' -out 'daily' -from from
| put deseason = deweek - daily
| split
| @timechart -title "de-seasonalization" -display.dataDensity 0
// simulated sources for demos and tests
//
// Exported subs:
// bumpy_cpu: a 10-minute cpu metric with daily variation
// ripple_cpu: a 10-second cpu metric with minute-variation
//
// Exported functions:
//
// Exported reducers:
//
////////////////////////////////////////////////////////////////////////
//
// bumpy_cpu:
// a 10-minute cpu metric with noise and daily periodicity but no trend
//
function weekend_factor(time) {
if ((Date.get(time,'h') > 22 && Date.get(time, 'e') == 5)
|| Date.get(time, 'e') == 6
|| (Date.get(time,'h') < 6 && Date.get(time, 'e') == 1)) {
return 0.6 ;
} else if (Date.get(time, 'e') == 0) {
return 0.3 ;
} else {
return 1.0;
}
}
export sub bumpy_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -dos .3 -dos_dur :15s: -daily 3
-from from -to to -every :10m:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value" * weekend_factor(time)
| keep cpu, time
}
//
// ripple_cpu:
// a 10-second cpu metric with noise and daily periodicity but no trend
//
export sub ripple_cpu(from, to) {
read -demo 'cdn' -nhosts 4 -daily 0
-ripple .5 -ripple_dur 60 -ripple_levels [-0.25, -0.25, 0, 0, 0, 0, 0.5]
-from from -to to -every :10s:
host ~ 'sjc*' name = 'cpu'
| put cpu=*"value"
| keep cpu, time
}
// example:
const start = :2014-01-01: ;
const dur = :1h: ;
bumpy_cpu -from :-2d: -to :now: | @timechart -title "2-day cpu variation"
;
ripple_cpu -from start -to start+dur
| @timechart -title "hourly cpu variation" -display.dataDensity 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment