Skip to content

Instantly share code, notes, and snippets.

@welch
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save welch/6f2053f2bfc7d9e7b4d9 to your computer and use it in GitHub Desktop.
Save welch/6f2053f2bfc7d9e7b4d9 to your computer and use it in GitHub Desktop.
juttle trend module (standalone)
// model trends in a series via linear regression.
//
// Exported subs:
// fit: do a least-squares fit of a line to successive batches of points,
// and calculate the trend's value at each point. A fit is computed for each batch.
// ... | fit -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'trend';
// fit_initial: do a least-squares fit of a line to an initial window of points,
// and calculate this trend's value at subsequent points. A fit is computed once.
// ... | fit_initial -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend';
// fit_trailing: do a least-squares fit of a line to a moving window of points
// and calculate the trend value at each point. A fit is computed from each point.
// ... | fit_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'trend';
// rate: robust rate of change estimation by fitting a trend to trailing data.
// ... | rate -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'rate' ;
// change: trend-based change over an interval. This is rate * dt and is in the same units as the input.
// ... | change -in 'cpu' -dt :5m: -t0 :2014-01-01: -out 'change' ;
// subtract: de-trend a series of points or successive batches of points.
// ... | subtract -in 'cpu' -every :2h: -over :8h: -t0 :2014-01-01: -out 'detrend_2h' ;
// subtract_trailing: de-trend a series of points by re-fitting at each point.
// ... | subtract_trailing -in 'cpu' -over :2h: -t0 :2014-01-01: -out 'detrend_2h' ;
//
// Exported reducer:
// fit_coeffs: linear regression on a timeseries
// return coefficients of least-squares fit of a line to a specified field on a batch of points.
// ... | reduce line = coeffs(y, t0)
// | put y0 = value(t0, coeffs), y1 = value(t1, coeffs)
// | ...
//////////////////////////////////////////////////////////////////////////////////////
//
// math utilities
//
// Exported subs:
//
// Exported functions:
// inv2x2(a,b,c,d): invert 2x2 matrix
// isFinite(x): is x a good number?
//
// Exported reducers:
//
/////////////////////////////////////////////////////////////////////////
//
// inv2x2:
// invert a 2x2 matrix | a, b ; c, d | (row major order)
//
export function inv2x2(a, b, c, d) {
var det = (a * d - b * c) ;
return [d / det, -b / det, -c / det, a / det] ;
}
//
// isFinite:
// verify x is a good number.
//
export function isFinite(x) {
return (x == x && x != null && x != x + 1);
}
//
// fit_coeffs: Linear regression vs time, for a batch or window of points.
// parameters:
// y -- field to fit
// t0 -- time origin for first fitted line
// returns vector [b, m, t0, n]
// b -- yintercept, m: slope (per second), n: number of points fit.
//
export reducer fit_coeffs(y, t0) {
var initial = true ;
var T, Y, T2, TY; // accumulate t, y, ... etc
var n;
function update() {
if (initial) {
T = Y = T2 = TY = n = 0;
initial = false;
}
var t = Duration.seconds(*"time" - t0) ;
T = T + t;
Y = Y + *y;
T2 = T2 + t * t;
TY = TY + t * *y;
n = n + 1;
}
function expire() {
var t = Duration.seconds(*"time" - t0) ;
T = T - t;
Y = Y - *y;
T2 = T2 - t * t;
TY = TY - t * *y;
n = n - 1;
}
function result() {
var inv = inv2x2(T2, T, T, n) ;
var m = inv[0] * TY + inv[1] * Y;
var b = inv[2] * TY + inv[3] * Y;
return [b, m, t0, n];
}
function reset() {
initial = true;
}
}
//
// value:
// evaluate the trendline represented by coeffs at the given time
//
export function value(t, coeffs, default) {
if (isFinite(coeffs[0]) && isFinite(coeffs[1])) {
return coeffs[0] + Duration.seconds(t - coeffs[2]) * coeffs[1];
} else {
return default;
}
}
//
// fit:
// construct a sequence of linear fits over epochs specified by -every@t0.
// output each point's projection onto the fitted line covering its epoch.
// parameters:
// in -- input field
// every -- re-fitting interval
// over -- data window for fitting (>= every)
// t0 -- time origin for first fitted line
// out -- output field for projected line's value on each point
//
export sub fit(in, every, over, t0, out) {
(
reduce -every every -over over -on t0 __fit_tmp=fit_coeffs(in, t0)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from t0 -every :0s: ;
merge ;
)| join
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// fit_initial:
// fit coefficients only once against the initial window,
// thereafter projecting against this same trend as time advances.
// For periodic data, this is a little dangerous if not run against an even
// multiple of the periodicity, because the extra partial-period data
// gives rise to a nonzero trend (phase-shift). But really, if you know you
// have periodic data, you shouldn't be fitting a trend!
//
// parameters:
// in -- input field
// initial -- duration of initial batch to fit
// t0 -- time origin for initial batch
// out -- output field for projected line's value on each point
//
export sub fit_initial(in, initial, t0, out) {
(
reduce -every initial -over initial -on t0 -from t0 -to t0+initial __fit_tmp=fit_coeffs(in, t0)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from t0 -every :0s: ;
merge ;
)| join
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// fit_trailing:
// construct a linear fit over a trailing window of points
// and put its value on the incoming point. A new fit is computed at each point.
// parameters:
// in -- input field
// over -- trailing window to fit trend over
// t0 -- time origin for first fitted line
// out -- output field for projected line's value
//
export sub fit_trailing(in, over, t0, out) {
put -over over __fit_tmp = fit_coeffs(in, t0)
| put *out = value(time, __fit_tmp, *in)
| remove __fit_tmp
}
//
// subtract:
// de-trend intervals of points by subtracting their MSE linear fit.
// There will be a trend discontinuity at each refitting interval, so if
// this is a concern you should use subtract_trailing
// parameters:
// in -- field to de-trend
// every -- re-fitting interval
// over -- data window for fitting (>= every)
// t0 -- earliest time
// out -- output field for detrended value on each point
//
export sub subtract(in, every, over, t0, out) {
fit -in in -every every -over over -t0 t0 -out '__subtract_tmp'
| put *out = *in - __subtract_tmp
| remove __subtract_tmp
}
//
// subtract_trailing:
// de-trend intervals of points by subtracting the MSE linear fit
// as each point arrives.
// parameters:
// in -- field to de-trend
// over -- data window for fitting
// t0 -- earliest time
// out -- output field for detrended value on each point
//
export sub subtract_trailing(in, over, t0, out) {
fit_trailing -in in -over over -t0 t0 -out '__subtract_tmp'
| put *out = *in - __subtract_tmp
| remove __subtract_tmp
}
//
// rate:
// robustly estimate rate of change by fitting trendlines to a short moving
// window of points anchored at each successive point, and returning their slopes.
// parameters:
// in -- field to rate-estimate
// dt -- time window for rate estimator (longer values give smoother derivatives)
// t0 -- earliest time
// out -- output field for rate (in/second) on each point
//
export sub rate(in, dt, t0, out) {
(
reduce -every dt -over dt -on t0 __dt_tmp=fit_coeffs(in, t0)
// XXX: we don't have a lag operator, so use pace to send the result back in
// time to beginning of its batch so it can be joined with the data it was fitted to.
| pace -from t0 -every :0s: ;
merge ;
)| join
| put *out = isFinite(__dt_tmp[1]) ? __dt_tmp[1] : null
| remove __dt_tmp
}
//
// change:
// estimate the amount of change over an interval due to a linear trend over the interval.
// parameters:
// in -- field to rate-estimate
// dt -- time window for rate estimator (longer values give smoother derivatives)
// t0 -- earliest time
// out -- output field for amount change in this interval due to linear trend.
//
export sub change (in, dt, t0, out) {
rate -in in -dt dt -t0 t0 -out out
| put *out = *out * Duration.seconds(dt)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment